From dc180cad641495cf5ed8c401bfcd502e706d75bd Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Wed, 5 Jul 2023 14:38:13 +0800 Subject: [PATCH] *: add test for misusing keyspace ID when creating the client (#6754) ref tikv/pd#6747, ref tikv/pd#6748, ref tikv/pd#6749 Signed-off-by: Ryan Leung --- .gitignore | 1 + pkg/keyspace/tso_keyspace_group.go | 32 +++++ pkg/keyspace/util.go | 3 + .../mcs/tso/keyspace_group_manager_test.go | 113 +++++++++++------- 4 files changed, 108 insertions(+), 41 deletions(-) diff --git a/.gitignore b/.gitignore index 93e6189a687..748d24872b6 100644 --- a/.gitignore +++ b/.gitignore @@ -24,3 +24,4 @@ report.xml coverage.xml coverage *.txt +go.work* diff --git a/pkg/keyspace/tso_keyspace_group.go b/pkg/keyspace/tso_keyspace_group.go index dd9319e806f..d319798738b 100644 --- a/pkg/keyspace/tso_keyspace_group.go +++ b/pkg/keyspace/tso_keyspace_group.go @@ -17,6 +17,7 @@ package keyspace import ( "context" "encoding/json" + "fmt" "sort" "strconv" "strings" @@ -25,6 +26,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/tsopb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/balancer" "github.com/tikv/pd/pkg/mcs/discovery" @@ -1010,3 +1012,33 @@ func (m *GroupManager) FinishMergeKeyspaceByID(mergeTargetID uint32) error { zap.Reflect("merge-list", mergeList)) return nil } + +// GetKeyspaceGroupPrimaryByID returns the primary node of the keyspace group by ID. +func (m *GroupManager) GetKeyspaceGroupPrimaryByID(id uint32) (string, error) { + // check if the keyspace group exists + kg, err := m.GetKeyspaceGroupByID(id) + if err != nil { + return "", err + } + if kg == nil { + return "", ErrKeyspaceGroupNotExists(id) + } + + // default keyspace group: "/ms/{cluster_id}/tso/00000/primary". + // non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}/primary". + path := fmt.Sprintf("/ms/%d/tso/00000/primary", m.clusterID) + if id != utils.DefaultKeyspaceGroupID { + path = fmt.Sprintf("/ms/%d/tso/keyspace_groups/election/%05d/primary", m.clusterID, id) + } + leader := &tsopb.Participant{} + ok, _, err := etcdutil.GetProtoMsgWithModRev(m.client, path, leader) + if err != nil { + return "", err + } + if !ok { + return "", ErrKeyspaceGroupPrimaryNotFound + } + // The format of leader name is address-groupID. + contents := strings.Split(leader.GetName(), "-") + return contents[0], err +} diff --git a/pkg/keyspace/util.go b/pkg/keyspace/util.go index 240306f8124..100b0eb6986 100644 --- a/pkg/keyspace/util.go +++ b/pkg/keyspace/util.go @@ -94,6 +94,9 @@ var ( } // Only keyspaces in the state specified by allowChangeConfig are allowed to change their config. allowChangeConfig = []keyspacepb.KeyspaceState{keyspacepb.KeyspaceState_ENABLED, keyspacepb.KeyspaceState_DISABLED} + + // ErrKeyspaceGroupPrimaryNotFound is used to indicate primary of target keyspace group does not exist. + ErrKeyspaceGroupPrimaryNotFound = errors.New("primary of keyspace group does not exist") ) // validateID check if keyspace falls within the acceptable range. diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index a20eb33fb81..52974e3155f 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -493,6 +493,7 @@ func TestTwiceSplitKeyspaceGroup(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`)) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastGroupSplitPatroller", `return(true)`)) // Init api server config but not start. tc, err := tests.NewTestAPICluster(ctx, 1, func(conf *config.Config, _ string) { @@ -503,21 +504,6 @@ func TestTwiceSplitKeyspaceGroup(t *testing.T) { re.NoError(err) pdAddr := tc.GetConfig().GetClientURL() - // Start pd client and wait pd server start. - var clients sync.Map - go func() { - apiCtx := pd.NewAPIContextV2("keyspace_b") // its keyspace id is 2. - cli, err := pd.NewClientWithAPIContext(ctx, apiCtx, []string{pdAddr}, pd.SecurityOption{}) - re.NoError(err) - clients.Store("keyspace_b", cli) - }() - go func() { - apiCtx := pd.NewAPIContextV2("keyspace_a") // its keyspace id is 1. - cli, err := pd.NewClientWithAPIContext(ctx, apiCtx, []string{pdAddr}, pd.SecurityOption{}) - re.NoError(err) - clients.Store("keyspace_a", cli) - }() - // Start api server and tso server. err = tc.RunInitialServers() re.NoError(err) @@ -531,20 +517,6 @@ func TestTwiceSplitKeyspaceGroup(t *testing.T) { defer tsoCluster.Destroy() tsoCluster.WaitForDefaultPrimaryServing(re) - // Wait pd clients are ready. - testutil.Eventually(re, func() bool { - count := 0 - clients.Range(func(_, _ interface{}) bool { - count++ - return true - }) - return count == 2 - }) - clientA, ok := clients.Load("keyspace_a") - re.True(ok) - clientB, ok := clients.Load("keyspace_b") - re.True(ok) - // First split keyspace group 0 to 1 with keyspace 2. kgm := leaderServer.GetServer().GetKeyspaceGroupManager() re.NotNil(kgm) @@ -553,13 +525,7 @@ func TestTwiceSplitKeyspaceGroup(t *testing.T) { return err == nil }) - // Trigger checkTSOSplit to ensure the split is finished. - testutil.Eventually(re, func() bool { - _, _, err = clientB.(pd.Client).GetTS(ctx) - return err == nil - }) waitFinishSplit(re, leaderServer, 0, 1, []uint32{mcsutils.DefaultKeyspaceID, 1}, []uint32{2}) - clientB.(pd.Client).Close() // Then split keyspace group 0 to 2 with keyspace 1. testutil.Eventually(re, func() bool { @@ -567,13 +533,7 @@ func TestTwiceSplitKeyspaceGroup(t *testing.T) { return err == nil }) - // Trigger checkTSOSplit to ensure the split is finished. - testutil.Eventually(re, func() bool { - _, _, err = clientA.(pd.Client).GetTS(ctx) - return err == nil - }) waitFinishSplit(re, leaderServer, 0, 2, []uint32{mcsutils.DefaultKeyspaceID}, []uint32{1}) - clientA.(pd.Client).Close() // Check the keyspace group 0 is split to 1 and 2. kg0 := handlersutil.MustLoadKeyspaceGroupByID(re, leaderServer, 0) @@ -586,6 +546,7 @@ func TestTwiceSplitKeyspaceGroup(t *testing.T) { re.False(kg1.IsSplitting()) re.False(kg2.IsSplitting()) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastGroupSplitPatroller")) re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes")) } @@ -724,3 +685,73 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMergeClient() cancel() wg.Wait() } + +// See https://github.com/tikv/pd/issues/6748 +func TestGetTSOImmediately(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastPrimaryPriorityCheck", `return(true)`)) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`)) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastGroupSplitPatroller", `return(true)`)) + + // Init api server config but not start. + tc, err := tests.NewTestAPICluster(ctx, 1, func(conf *config.Config, _ string) { + conf.Keyspace.PreAlloc = []string{ + "keyspace_a", "keyspace_b", + } + }) + re.NoError(err) + pdAddr := tc.GetConfig().GetClientURL() + + // Start api server and tso server. + err = tc.RunInitialServers() + re.NoError(err) + defer tc.Destroy() + tc.WaitLeader() + leaderServer := tc.GetServer(tc.GetLeader()) + re.NoError(leaderServer.BootstrapCluster()) + + tsoCluster, err := mcs.NewTestTSOCluster(ctx, 2, pdAddr) + re.NoError(err) + defer tsoCluster.Destroy() + tsoCluster.WaitForDefaultPrimaryServing(re) + + // First split keyspace group 0 to 1 with keyspace 2. + kgm := leaderServer.GetServer().GetKeyspaceGroupManager() + re.NotNil(kgm) + testutil.Eventually(re, func() bool { + err = kgm.SplitKeyspaceGroupByID(0, 1, []uint32{2}) + return err == nil + }) + + waitFinishSplit(re, leaderServer, 0, 1, []uint32{mcsutils.DefaultKeyspaceID, 1}, []uint32{2}) + + kg0 := handlersutil.MustLoadKeyspaceGroupByID(re, leaderServer, 0) + kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, leaderServer, 1) + re.Equal([]uint32{0, 1}, kg0.Keyspaces) + re.Equal([]uint32{2}, kg1.Keyspaces) + re.False(kg0.IsSplitting()) + re.False(kg1.IsSplitting()) + + // Let group 0 and group 1 have different primary node. + kgm.SetPriorityForKeyspaceGroup(0, kg0.Members[0].Address, 100) + kgm.SetPriorityForKeyspaceGroup(1, kg1.Members[1].Address, 100) + testutil.Eventually(re, func() bool { + p0, err := kgm.GetKeyspaceGroupPrimaryByID(0) + re.NoError(err) + p1, err := kgm.GetKeyspaceGroupPrimaryByID(1) + re.NoError(err) + return p0 == kg0.Members[0].Address && p1 == kg1.Members[1].Address + }, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond)) + + apiCtx := pd.NewAPIContextV2("keyspace_b") // its keyspace id is 2. + cli, err := pd.NewClientWithAPIContext(ctx, apiCtx, []string{pdAddr}, pd.SecurityOption{}) + re.NoError(err) + _, _, err = cli.GetTS(ctx) + re.NoError(err) + cli.Close() + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastPrimaryPriorityCheck")) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes")) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastGroupSplitPatroller")) +}