Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add test for misusing keyspace ID when creating the client #6754

Merged
merged 2 commits into from
Jul 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ report.xml
coverage.xml
coverage
*.txt
go.work*
32 changes: 32 additions & 0 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package keyspace
import (
"context"
"encoding/json"
"fmt"
"sort"
"strconv"
"strings"
Expand All @@ -25,6 +26,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/tsopb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/balancer"
"github.com/tikv/pd/pkg/mcs/discovery"
Expand Down Expand Up @@ -1010,3 +1012,33 @@ func (m *GroupManager) FinishMergeKeyspaceByID(mergeTargetID uint32) error {
zap.Reflect("merge-list", mergeList))
return nil
}

// GetKeyspaceGroupPrimaryByID returns the primary node of the keyspace group by ID.
func (m *GroupManager) GetKeyspaceGroupPrimaryByID(id uint32) (string, error) {
// check if the keyspace group exists
kg, err := m.GetKeyspaceGroupByID(id)
if err != nil {
return "", err
}
if kg == nil {
return "", ErrKeyspaceGroupNotExists(id)
}

// default keyspace group: "/ms/{cluster_id}/tso/00000/primary".
// non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}/primary".
path := fmt.Sprintf("/ms/%d/tso/00000/primary", m.clusterID)
if id != utils.DefaultKeyspaceGroupID {
path = fmt.Sprintf("/ms/%d/tso/keyspace_groups/election/%05d/primary", m.clusterID, id)
}
Comment on lines +1027 to +1032
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to move this part to pkg/storage/endpoint/key_path.go?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it waits for #6747 to do the changes.

leader := &tsopb.Participant{}
ok, _, err := etcdutil.GetProtoMsgWithModRev(m.client, path, leader)
if err != nil {
return "", err
}
if !ok {
return "", ErrKeyspaceGroupPrimaryNotFound
}
// The format of leader name is address-groupID.
contents := strings.Split(leader.GetName(), "-")
return contents[0], err
}
3 changes: 3 additions & 0 deletions pkg/keyspace/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ var (
}
// Only keyspaces in the state specified by allowChangeConfig are allowed to change their config.
allowChangeConfig = []keyspacepb.KeyspaceState{keyspacepb.KeyspaceState_ENABLED, keyspacepb.KeyspaceState_DISABLED}

// ErrKeyspaceGroupPrimaryNotFound is used to indicate primary of target keyspace group does not exist.
ErrKeyspaceGroupPrimaryNotFound = errors.New("primary of keyspace group does not exist")
)

// validateID check if keyspace falls within the acceptable range.
Expand Down
113 changes: 72 additions & 41 deletions tests/integrations/mcs/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ func TestTwiceSplitKeyspaceGroup(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`))
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastGroupSplitPatroller", `return(true)`))

// Init api server config but not start.
tc, err := tests.NewTestAPICluster(ctx, 1, func(conf *config.Config, _ string) {
Expand All @@ -503,21 +504,6 @@ func TestTwiceSplitKeyspaceGroup(t *testing.T) {
re.NoError(err)
pdAddr := tc.GetConfig().GetClientURL()

// Start pd client and wait pd server start.
var clients sync.Map
go func() {
apiCtx := pd.NewAPIContextV2("keyspace_b") // its keyspace id is 2.
cli, err := pd.NewClientWithAPIContext(ctx, apiCtx, []string{pdAddr}, pd.SecurityOption{})
re.NoError(err)
clients.Store("keyspace_b", cli)
}()
go func() {
apiCtx := pd.NewAPIContextV2("keyspace_a") // its keyspace id is 1.
cli, err := pd.NewClientWithAPIContext(ctx, apiCtx, []string{pdAddr}, pd.SecurityOption{})
re.NoError(err)
clients.Store("keyspace_a", cli)
}()

// Start api server and tso server.
err = tc.RunInitialServers()
re.NoError(err)
Expand All @@ -531,20 +517,6 @@ func TestTwiceSplitKeyspaceGroup(t *testing.T) {
defer tsoCluster.Destroy()
tsoCluster.WaitForDefaultPrimaryServing(re)

// Wait pd clients are ready.
testutil.Eventually(re, func() bool {
count := 0
clients.Range(func(_, _ interface{}) bool {
count++
return true
})
return count == 2
})
clientA, ok := clients.Load("keyspace_a")
re.True(ok)
clientB, ok := clients.Load("keyspace_b")
re.True(ok)

// First split keyspace group 0 to 1 with keyspace 2.
kgm := leaderServer.GetServer().GetKeyspaceGroupManager()
re.NotNil(kgm)
Expand All @@ -553,27 +525,15 @@ func TestTwiceSplitKeyspaceGroup(t *testing.T) {
return err == nil
})

// Trigger checkTSOSplit to ensure the split is finished.
testutil.Eventually(re, func() bool {
_, _, err = clientB.(pd.Client).GetTS(ctx)
return err == nil
})
waitFinishSplit(re, leaderServer, 0, 1, []uint32{mcsutils.DefaultKeyspaceID, 1}, []uint32{2})
clientB.(pd.Client).Close()

// Then split keyspace group 0 to 2 with keyspace 1.
testutil.Eventually(re, func() bool {
err = kgm.SplitKeyspaceGroupByID(0, 2, []uint32{1})
return err == nil
})

// Trigger checkTSOSplit to ensure the split is finished.
testutil.Eventually(re, func() bool {
_, _, err = clientA.(pd.Client).GetTS(ctx)
return err == nil
})
waitFinishSplit(re, leaderServer, 0, 2, []uint32{mcsutils.DefaultKeyspaceID}, []uint32{1})
clientA.(pd.Client).Close()

// Check the keyspace group 0 is split to 1 and 2.
kg0 := handlersutil.MustLoadKeyspaceGroupByID(re, leaderServer, 0)
Expand All @@ -586,6 +546,7 @@ func TestTwiceSplitKeyspaceGroup(t *testing.T) {
re.False(kg1.IsSplitting())
re.False(kg2.IsSplitting())

re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastGroupSplitPatroller"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes"))
}

Expand Down Expand Up @@ -724,3 +685,73 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMergeClient()
cancel()
wg.Wait()
}

// See https://github.com/tikv/pd/issues/6748
func TestGetTSOImmediately(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

accurate

re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastPrimaryPriorityCheck", `return(true)`))
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`))
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastGroupSplitPatroller", `return(true)`))

// Init api server config but not start.
tc, err := tests.NewTestAPICluster(ctx, 1, func(conf *config.Config, _ string) {
conf.Keyspace.PreAlloc = []string{
"keyspace_a", "keyspace_b",
}
})
re.NoError(err)
pdAddr := tc.GetConfig().GetClientURL()

// Start api server and tso server.
err = tc.RunInitialServers()
re.NoError(err)
defer tc.Destroy()
tc.WaitLeader()
leaderServer := tc.GetServer(tc.GetLeader())
re.NoError(leaderServer.BootstrapCluster())

tsoCluster, err := mcs.NewTestTSOCluster(ctx, 2, pdAddr)
re.NoError(err)
defer tsoCluster.Destroy()
tsoCluster.WaitForDefaultPrimaryServing(re)

// First split keyspace group 0 to 1 with keyspace 2.
kgm := leaderServer.GetServer().GetKeyspaceGroupManager()
re.NotNil(kgm)
testutil.Eventually(re, func() bool {
err = kgm.SplitKeyspaceGroupByID(0, 1, []uint32{2})
return err == nil
})

waitFinishSplit(re, leaderServer, 0, 1, []uint32{mcsutils.DefaultKeyspaceID, 1}, []uint32{2})

kg0 := handlersutil.MustLoadKeyspaceGroupByID(re, leaderServer, 0)
kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, leaderServer, 1)
re.Equal([]uint32{0, 1}, kg0.Keyspaces)
re.Equal([]uint32{2}, kg1.Keyspaces)
re.False(kg0.IsSplitting())
re.False(kg1.IsSplitting())

// Let group 0 and group 1 have different primary node.
kgm.SetPriorityForKeyspaceGroup(0, kg0.Members[0].Address, 100)
kgm.SetPriorityForKeyspaceGroup(1, kg1.Members[1].Address, 100)
testutil.Eventually(re, func() bool {
p0, err := kgm.GetKeyspaceGroupPrimaryByID(0)
re.NoError(err)
p1, err := kgm.GetKeyspaceGroupPrimaryByID(1)
re.NoError(err)
return p0 == kg0.Members[0].Address && p1 == kg1.Members[1].Address
}, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond))

apiCtx := pd.NewAPIContextV2("keyspace_b") // its keyspace id is 2.
cli, err := pd.NewClientWithAPIContext(ctx, apiCtx, []string{pdAddr}, pd.SecurityOption{})
re.NoError(err)
_, _, err = cli.GetTS(ctx)
re.NoError(err)
cli.Close()
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastPrimaryPriorityCheck"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastGroupSplitPatroller"))
}