From f0694110209d3f68f2b9dab5ce147d291fd4cbfa Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Tue, 13 Jun 2023 11:33:07 +0800 Subject: [PATCH] tests: make TestSplitKeyspaceGroup stable (#6584) close tikv/pd#6571 Signed-off-by: lhy1024 --- pkg/keyspace/keyspace_test.go | 2 +- pkg/keyspace/tso_keyspace_group.go | 55 ++++++++++++++------- pkg/keyspace/tso_keyspace_group_test.go | 2 +- server/cluster/cluster.go | 2 +- tests/pdctl/keyspace/keyspace_group_test.go | 51 +++++++++++++++++-- tests/pdctl/scheduler/scheduler_test.go | 9 +--- 6 files changed, 89 insertions(+), 32 deletions(-) diff --git a/pkg/keyspace/keyspace_test.go b/pkg/keyspace/keyspace_test.go index 948fe434088..dadc2a2509f 100644 --- a/pkg/keyspace/keyspace_test.go +++ b/pkg/keyspace/keyspace_test.go @@ -80,7 +80,7 @@ func (suite *keyspaceTestSuite) SetupTest() { allocator := mockid.NewIDAllocator() kgm := NewKeyspaceGroupManager(suite.ctx, store, nil, 0) suite.manager = NewKeyspaceManager(suite.ctx, store, nil, allocator, &mockConfig{}, kgm) - suite.NoError(kgm.Bootstrap()) + suite.NoError(kgm.Bootstrap(suite.ctx)) suite.NoError(suite.manager.Bootstrap()) } diff --git a/pkg/keyspace/tso_keyspace_group.go b/pkg/keyspace/tso_keyspace_group.go index 2a477c3dea1..efe905b6439 100644 --- a/pkg/keyspace/tso_keyspace_group.go +++ b/pkg/keyspace/tso_keyspace_group.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "strconv" + "strings" "sync" "time" @@ -51,9 +52,11 @@ const ( // GroupManager is the manager of keyspace group related data. type GroupManager struct { - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + client *clientv3.Client + clusterID uint64 sync.RWMutex // groups is the cache of keyspace group related information. @@ -90,24 +93,24 @@ func NewKeyspaceGroupManager( cancel: cancel, store: store, groups: groups, + client: client, + clusterID: clusterID, nodesBalancer: balancer.GenByPolicy[string](defaultBalancerPolicy), serviceRegistryMap: make(map[string]string), } // If the etcd client is not nil, start the watch loop for the registered tso servers. // The PD(TSO) Client relies on this info to discover tso servers. - if client != nil { - m.initTSONodesWatcher(client, clusterID) - m.wg.Add(2) + if m.client != nil { + m.initTSONodesWatcher(m.client, m.clusterID) + m.wg.Add(1) go m.tsoNodesWatcher.StartWatchLoop() - go m.allocNodesToAllKeyspaceGroups() } - return m } // Bootstrap saves default keyspace group info and init group mapping in the memory. -func (m *GroupManager) Bootstrap() error { +func (m *GroupManager) Bootstrap(ctx context.Context) error { // Force the membership restriction that the default keyspace must belong to default keyspace group. // Have no information to specify the distribution of the default keyspace group replicas, so just // leave the replica/member list empty. The TSO service will assign the default keyspace group replica @@ -137,6 +140,11 @@ func (m *GroupManager) Bootstrap() error { m.groups[userKind].Put(group) } + // It will only alloc node when the group manager is on API leader. + if m.client != nil { + m.wg.Add(1) + go m.allocNodesToAllKeyspaceGroups(ctx) + } return nil } @@ -146,7 +154,7 @@ func (m *GroupManager) Close() { m.wg.Wait() } -func (m *GroupManager) allocNodesToAllKeyspaceGroups() { +func (m *GroupManager) allocNodesToAllKeyspaceGroups(ctx context.Context) { defer logutil.LogPanic() defer m.wg.Done() ticker := time.NewTicker(allocNodesToKeyspaceGroupsInterval) @@ -158,7 +166,7 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups() { log.Info("start to alloc nodes to all keyspace groups") for { select { - case <-m.ctx.Done(): + case <-ctx.Done(): log.Info("stop to alloc nodes to all keyspace groups") return case <-ticker.C: @@ -338,11 +346,6 @@ func (m *GroupManager) saveKeyspaceGroups(keyspaceGroups []*endpoint.KeyspaceGro Members: keyspaceGroup.Members, Keyspaces: keyspaceGroup.Keyspaces, } - if oldKG.IsSplitting() { - newKG.SplitState = &endpoint.SplitState{ - SplitSource: oldKG.SplitState.SplitSource, - } - } err = m.store.SaveKeyspaceGroup(txn, newKG) if err != nil { return err @@ -380,6 +383,8 @@ func (m *GroupManager) getKeyspaceConfigByKindLocked(userKind endpoint.UserKind) return config, nil } +var failpointOnce sync.Once + // UpdateKeyspaceForGroup updates the keyspace field for the keyspace group. func (m *GroupManager) UpdateKeyspaceForGroup(userKind endpoint.UserKind, groupID string, keyspaceID uint32, mutation int) error { // when server is not in API mode, we don't need to update the keyspace for keyspace group @@ -391,6 +396,12 @@ func (m *GroupManager) UpdateKeyspaceForGroup(userKind endpoint.UserKind, groupI return err } + failpoint.Inject("externalAllocNode", func(val failpoint.Value) { + failpointOnce.Do(func() { + addrs := val.(string) + m.SetNodesForKeyspaceGroup(utils.DefaultKeyspaceGroupID, strings.Split(addrs, ",")) + }) + }) m.Lock() defer m.Unlock() return m.updateKeyspaceForGroupLocked(userKind, id, keyspaceID, mutation) @@ -425,7 +436,6 @@ func (m *GroupManager) updateKeyspaceForGroupLocked(userKind endpoint.UserKind, if err := m.saveKeyspaceGroups([]*endpoint.KeyspaceGroup{kg}, true); err != nil { return err } - m.groups[userKind].Put(kg) } return nil @@ -696,8 +706,10 @@ func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, desiredReplicaCount func (m *GroupManager) SetNodesForKeyspaceGroup(id uint32, nodes []string) error { m.Lock() defer m.Unlock() - return m.store.RunInTxn(m.ctx, func(txn kv.Txn) error { - kg, err := m.store.LoadKeyspaceGroup(txn, id) + var kg *endpoint.KeyspaceGroup + err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) error { + var err error + kg, err = m.store.LoadKeyspaceGroup(txn, id) if err != nil { return err } @@ -714,6 +726,11 @@ func (m *GroupManager) SetNodesForKeyspaceGroup(id uint32, nodes []string) error kg.Members = members return m.store.SaveKeyspaceGroup(txn, kg) }) + if err != nil { + return err + } + m.groups[endpoint.StringUserKind(kg.UserKind)].Put(kg) + return nil } // IsExistNode checks if the node exists. diff --git a/pkg/keyspace/tso_keyspace_group_test.go b/pkg/keyspace/tso_keyspace_group_test.go index 214c10bea7f..df1aa49ee37 100644 --- a/pkg/keyspace/tso_keyspace_group_test.go +++ b/pkg/keyspace/tso_keyspace_group_test.go @@ -48,7 +48,7 @@ func (suite *keyspaceGroupTestSuite) SetupTest() { idAllocator := mockid.NewIDAllocator() cluster := mockcluster.NewCluster(suite.ctx, mockconfig.NewTestOptions()) suite.kg = NewKeyspaceManager(suite.ctx, store, cluster, idAllocator, &mockConfig{}, suite.kgm) - suite.NoError(suite.kgm.Bootstrap()) + suite.NoError(suite.kgm.Bootstrap(suite.ctx)) } func (suite *keyspaceGroupTestSuite) TearDownTest() { diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index e5426f2828c..7238b121076 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -281,7 +281,7 @@ func (c *RaftCluster) Start(s Server) error { } if s.IsAPIServiceMode() { - err = c.keyspaceGroupManager.Bootstrap() + err = c.keyspaceGroupManager.Bootstrap(c.ctx) if err != nil { return err } diff --git a/tests/pdctl/keyspace/keyspace_group_test.go b/tests/pdctl/keyspace/keyspace_group_test.go index 141c8205cba..133edb78579 100644 --- a/tests/pdctl/keyspace/keyspace_group_test.go +++ b/tests/pdctl/keyspace/keyspace_group_test.go @@ -90,14 +90,14 @@ func TestKeyspaceGroup(t *testing.T) { } func TestSplitKeyspaceGroup(t *testing.T) { - t.Skip("skip this super flaky split keyspace group test which impacts everyone's productivity.") re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`)) re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayStartServerLoop", `return(true)`)) keyspaces := make([]string, 0) - for i := 0; i < 500; i++ { + // we test the case which exceed the default max txn ops limit in etcd, which is 128. + for i := 0; i < 129; i++ { keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i)) } tc, err := tests.NewTestAPICluster(ctx, 3, func(conf *config.Config, serverName string) { @@ -127,8 +127,53 @@ func TestSplitKeyspaceGroup(t *testing.T) { output, err := pdctl.ExecuteCommand(cmd, args...) re.NoError(err) return strings.Contains(string(output), "Success") - }, testutil.WithWaitFor(20*time.Second)) + }) + + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes")) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayStartServerLoop")) +} + +func TestExternalAllocNodeWhenStart(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // external alloc node for keyspace group, when keyspace manager update keyspace info to keyspace group + // we hope the keyspace group can be updated correctly. + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/externalAllocNode", `return("127.0.0.1:2379,127.0.0.1:2380")`)) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`)) + re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayStartServerLoop", `return(true)`)) + keyspaces := make([]string, 0) + for i := 0; i < 10; i++ { + keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i)) + } + tc, err := tests.NewTestAPICluster(ctx, 1, func(conf *config.Config, serverName string) { + conf.Keyspace.PreAlloc = keyspaces + }) + re.NoError(err) + err = tc.RunInitialServers() + re.NoError(err) + pdAddr := tc.GetConfig().GetClientURL() + + cmd := pdctlCmd.GetRootCmd() + + time.Sleep(2 * time.Second) + tc.WaitLeader() + leaderServer := tc.GetServer(tc.GetLeader()) + re.NoError(leaderServer.BootstrapCluster()) + + // check keyspace group information. + defaultKeyspaceGroupID := fmt.Sprintf("%d", utils.DefaultKeyspaceGroupID) + args := []string{"-u", pdAddr, "keyspace-group"} + testutil.Eventually(re, func() bool { + output, err := pdctl.ExecuteCommand(cmd, append(args, defaultKeyspaceGroupID)...) + re.NoError(err) + var keyspaceGroup endpoint.KeyspaceGroup + err = json.Unmarshal(output, &keyspaceGroup) + re.NoError(err) + return len(keyspaceGroup.Keyspaces) == len(keyspaces)+1 && len(keyspaceGroup.Members) == 2 + }) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/externalAllocNode")) re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes")) re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayStartServerLoop")) } diff --git a/tests/pdctl/scheduler/scheduler_test.go b/tests/pdctl/scheduler/scheduler_test.go index 02e2eeb5f64..db5ea2efe72 100644 --- a/tests/pdctl/scheduler/scheduler_test.go +++ b/tests/pdctl/scheduler/scheduler_test.go @@ -31,7 +31,6 @@ import ( ) func TestScheduler(t *testing.T) { - t.Skip("skip this super unstable test which impacts everyone's productivity") re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -464,12 +463,8 @@ func TestScheduler(t *testing.T) { result := make(map[string]interface{}) testutil.Eventually(re, func() bool { mightExec([]string{"-u", pdAddr, "scheduler", "describe", "balance-leader-scheduler"}, &result) - return len(result) != 0 - }, testutil.WithTickInterval(50*time.Millisecond)) - - testutil.Eventually(re, func() bool { - return result["status"] == "paused" && result["summary"] == "" - }, testutil.WithTickInterval(50*time.Millisecond)) + return len(result) != 0 && result["status"] == "paused" && result["summary"] == "" + }, testutil.WithWaitFor(30*time.Second)) mustUsage([]string{"-u", pdAddr, "scheduler", "resume", "balance-leader-scheduler", "60"}) mustExec([]string{"-u", pdAddr, "scheduler", "resume", "balance-leader-scheduler"}, nil)