From 63635eca624d85f30e2bba70bbe56be80cd34697 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Fri, 16 Jun 2023 13:31:43 +0800 Subject: [PATCH] mcs: some bug fix for keyspace group (#118) * mcs, tso: fix potential inconsistency caused by non-atomic applying keyspace movement state change in the persistent store (#6596) ref tikv/pd#5895 fix potential inconsistency caused by non-atomic applying the state change in the persistent in the following cases: 1. Keyspace group split/merge 2. Keyspace movement across keyspace groups. Signed-off-by: Bin Shi * client: fix keyspace update in `tsoSvcDiscovery` (#6612) close tikv/pd#6611 Signed-off-by: lhy1024 --------- Signed-off-by: Bin Shi Signed-off-by: lhy1024 Co-authored-by: Bin Shi <39923490+binshi-bing@users.noreply.github.com> --- client/client.go | 21 +++- client/tso_client.go | 4 +- pkg/balancer/round_robin.go | 3 +- pkg/tso/keyspace_group_manager.go | 18 ++- pkg/tso/keyspace_group_manager_test.go | 76 ++++++++++++- .../mcs/tso/keyspace_group_manager_test.go | 104 ++++++++++++++++++ 6 files changed, 212 insertions(+), 14 deletions(-) diff --git a/client/client.go b/client/client.go index f89ca65449c..2a75b0effc5 100644 --- a/client/client.go +++ b/client/client.go @@ -263,6 +263,14 @@ type serviceModeKeeper struct { tsoSvcDiscovery ServiceDiscovery } +func (k *serviceModeKeeper) SetKeyspaceID(keyspaceID uint32) { + k.Lock() + defer k.Unlock() + if k.serviceMode == pdpb.ServiceMode_API_SVC_MODE { + k.tsoSvcDiscovery.SetKeyspaceID(keyspaceID) + } +} + func (k *serviceModeKeeper) close() { k.Lock() defer k.Unlock() @@ -457,9 +465,6 @@ func newClientWithKeyspaceName( ctx context.Context, keyspaceName string, svrAddrs []string, security SecurityOption, opts ...ClientOption, ) (Client, error) { - log.Info("[pd] create pd client with endpoints and keyspace", - zap.Strings("pd-address", svrAddrs), zap.String("keyspace-name", keyspaceName)) - tlsCfg := &tlsutil.TLSConfig{ CAPath: security.CAPath, CertPath: security.CertPath, @@ -496,8 +501,12 @@ func newClientWithKeyspaceName( if err := c.initRetry(c.loadKeyspaceMeta, keyspaceName); err != nil { return nil, err } + // We call "c.pdSvcDiscovery.SetKeyspaceID(c.keyspaceID)" after service mode already switching to API mode + // and tso service discovery already initialized, so here we need to set the tso_service_discovery's keyspace id too. c.pdSvcDiscovery.SetKeyspaceID(c.keyspaceID) - + c.serviceModeKeeper.SetKeyspaceID(c.keyspaceID) + log.Info("[pd] create pd client with endpoints and keyspace", + zap.Strings("pd-address", svrAddrs), zap.String("keyspace-name", keyspaceName), zap.Uint32("keyspace-id", c.keyspaceID)) return c, nil } @@ -574,7 +583,7 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) { ) switch newMode { case pdpb.ServiceMode_PD_SVC_MODE: - newTSOCli = newTSOClient(c.ctx, c.option, c.keyspaceID, + newTSOCli = newTSOClient(c.ctx, c.option, c.pdSvcDiscovery, &pdTSOStreamBuilderFactory{}) case pdpb.ServiceMode_API_SVC_MODE: newTSOSvcDiscovery = newTSOServiceDiscovery( @@ -582,7 +591,7 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) { c.GetClusterID(c.ctx), c.keyspaceID, c.tlsCfg, c.option) // At this point, the keyspace group isn't known yet. Starts from the default keyspace group, // and will be updated later. - newTSOCli = newTSOClient(c.ctx, c.option, c.keyspaceID, + newTSOCli = newTSOClient(c.ctx, c.option, newTSOSvcDiscovery, &tsoTSOStreamBuilderFactory{}) if err := newTSOSvcDiscovery.Init(); err != nil { log.Error("[pd] failed to initialize tso service discovery. keep the current service mode", diff --git a/client/tso_client.go b/client/tso_client.go index c326e3e7160..d4dfaa03a91 100644 --- a/client/tso_client.go +++ b/client/tso_client.go @@ -70,7 +70,6 @@ type tsoClient struct { wg sync.WaitGroup option *option - keyspaceID uint32 svcDiscovery ServiceDiscovery tsoStreamBuilderFactory // tsoAllocators defines the mapping {dc-location -> TSO allocator leader URL} @@ -94,7 +93,7 @@ type tsoClient struct { // newTSOClient returns a new TSO client. func newTSOClient( - ctx context.Context, option *option, keyspaceID uint32, + ctx context.Context, option *option, svcDiscovery ServiceDiscovery, factory tsoStreamBuilderFactory, ) *tsoClient { ctx, cancel := context.WithCancel(ctx) @@ -102,7 +101,6 @@ func newTSOClient( ctx: ctx, cancel: cancel, option: option, - keyspaceID: keyspaceID, svcDiscovery: svcDiscovery, tsoStreamBuilderFactory: factory, checkTSDeadlineCh: make(chan struct{}), diff --git a/pkg/balancer/round_robin.go b/pkg/balancer/round_robin.go index cef35c43a5f..5013a447d3e 100644 --- a/pkg/balancer/round_robin.go +++ b/pkg/balancer/round_robin.go @@ -51,7 +51,8 @@ func (r *RoundRobin[T]) Next() (t T) { func (r *RoundRobin[T]) GetAll() []T { r.RLock() defer r.RUnlock() - return r.nodes + // return a copy to avoid data race + return append(r.nodes[:0:0], r.nodes...) } // Put puts one into balancer. diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 9eaea2bf48e..d962cfd0306 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -563,7 +563,14 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroupMembership( i++ j++ } else if i < oldLen && j < newLen && oldKeyspaces[i] < newKeyspaces[j] || j == newLen { - delete(kgm.keyspaceLookupTable, oldKeyspaces[i]) + // kgm.keyspaceLookupTable is a global lookup table for all keyspace groups, storing the + // keyspace group ID for each keyspace. If the keyspace group of this keyspace in this + // lookup table isn't the current keyspace group, it means the keyspace has been moved + // to another keyspace group which has already declared the ownership of the keyspace, + // and we shouldn't delete and overwrite the ownership. + if curGroupID, ok := kgm.keyspaceLookupTable[oldKeyspaces[i]]; ok && curGroupID == groupID { + delete(kgm.keyspaceLookupTable, oldKeyspaces[i]) + } i++ } else { newGroup.KeyspaceLookupTable[newKeyspaces[j]] = struct{}{} @@ -621,7 +628,8 @@ func (kgm *KeyspaceGroupManager) deleteKeyspaceGroup(groupID uint32) { // if kid == kg.ID, it means the keyspace still belongs to this keyspace group, // so we decouple the relationship in the global keyspace lookup table. // if kid != kg.ID, it means the keyspace has been moved to another keyspace group - // which has already declared the ownership of the keyspace. + // which has already declared the ownership of the keyspace, so we don't need + // delete it from the global keyspace lookup table and overwrite the ownership. if kid == kg.ID { delete(kgm.keyspaceLookupTable, kid) } @@ -821,6 +829,12 @@ func (kgm *KeyspaceGroupManager) checkTSOSplit( return err } if tsoutil.CompareTimestamp(&splitSourceTSO, &splitTSO) <= 0 { + log.Debug("the split source TSO is not greater than the newly split TSO", + zap.Int64("split-source-tso-physical", splitSourceTSO.Physical), + zap.Int64("split-source-tso-logical", splitSourceTSO.Logical), + zap.Int64("split-tso-physical", splitTSO.Physical), + zap.Int64("split-tso-logical", splitTSO.Logical), + ) return nil } // If the split source TSO is greater than the newly split TSO, we need to update the split diff --git a/pkg/tso/keyspace_group_manager_test.go b/pkg/tso/keyspace_group_manager_test.go index 18a09a82bd6..083bf566bf7 100644 --- a/pkg/tso/keyspace_group_manager_test.go +++ b/pkg/tso/keyspace_group_manager_test.go @@ -490,8 +490,8 @@ func (suite *keyspaceGroupManagerTestSuite) TestDefaultMembershipRestriction() { err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg) re.NoError(err) - // Sleep for a while to wait for the events to propagate. If the restriction is not working, - // it will cause random failure. + // Sleep for a while to wait for the events to propagate. If the logic doesn't work + // as expected, it will cause random failure. time.Sleep(1 * time.Second) // Should still be able to get AM for keyspace 0 in keyspace group 0. am, kg, kgid, err = mgr.getKeyspaceGroupMetaWithCheck( @@ -508,6 +508,78 @@ func (suite *keyspaceGroupManagerTestSuite) TestDefaultMembershipRestriction() { re.NotNil(kg) } +// TestKeyspaceMovementConsistency tests the consistency of keyspace movement. +// When a keyspace is moved from one keyspace group to another, the allocator manager +// update source group and target group state in etcd atomically. The TSO keyspace group +// manager watches the state change in persistent store but hard to apply the movement state +// change across two groups atomically. This test case is to test the movement state is +// eventually consistent, for example, if a keyspace "move to group B" event is applied +// before "move away from group A" event, the second event shouldn't overwrite the global +// state, such as the global keyspace group lookup table. +func (suite *keyspaceGroupManagerTestSuite) TestKeyspaceMovementConsistency() { + re := suite.Require() + + mgr := suite.newUniqueKeyspaceGroupManager(1) + re.NotNil(mgr) + defer mgr.Close() + + rootPath := mgr.legacySvcRootPath + svcAddr := mgr.tsoServiceID.ServiceAddr + + var ( + am *AllocatorManager + kg *endpoint.KeyspaceGroup + kgid uint32 + err error + event *etcdEvent + ) + + // Create keyspace group 0 which contains keyspace 0, 1, 2. + addKeyspaceGroupAssignment( + suite.ctx, suite.etcdClient, true, + mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, + mcsutils.DefaultKeyspaceGroupID, []uint32{mcsutils.DefaultKeyspaceID, 10, 20}) + // Create keyspace group 1 which contains keyspace 3, 4. + addKeyspaceGroupAssignment( + suite.ctx, suite.etcdClient, true, + mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, + uint32(1), []uint32{11, 21}) + + err = mgr.Initialize() + re.NoError(err) + + // Should be able to get AM for keyspace 10 in keyspace group 0. + am, kg, kgid, err = mgr.getKeyspaceGroupMetaWithCheck(10, mcsutils.DefaultKeyspaceGroupID) + re.NoError(err) + re.Equal(mcsutils.DefaultKeyspaceGroupID, kgid) + re.NotNil(am) + re.NotNil(kg) + + // Move keyspace 10 from keyspace group 0 to keyspace group 1 and apply this state change + // to TSO first. + event = generateKeyspaceGroupPutEvent(1, []uint32{10, 11, 21}, []string{svcAddr}) + err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg) + re.NoError(err) + // Wait until the keyspace 10 is served by keyspace group 1. + testutil.Eventually(re, func() bool { + _, _, kgid, err = mgr.getKeyspaceGroupMetaWithCheck(10, 1) + return err == nil && kgid == 1 + }, testutil.WithWaitFor(3*time.Second), testutil.WithTickInterval(50*time.Millisecond)) + + event = generateKeyspaceGroupPutEvent( + mcsutils.DefaultKeyspaceGroupID, []uint32{mcsutils.DefaultKeyspaceID, 20}, []string{svcAddr}) + err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg) + re.NoError(err) + + // Sleep for a while to wait for the events to propagate. If the restriction is not working, + // it will cause random failure. + time.Sleep(1 * time.Second) + // Should still be able to get AM for keyspace 10 in keyspace group 1. + _, _, kgid, err = mgr.getKeyspaceGroupMetaWithCheck(10, 1) + re.NoError(err) + re.Equal(uint32(1), kgid) +} + // TestHandleTSORequestWithWrongMembership tests the case that HandleTSORequest receives // a tso request with mismatched keyspace and keyspace group. func (suite *keyspaceGroupManagerTestSuite) TestHandleTSORequestWithWrongMembership() { diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index 8849f7a4ab5..ed3bfe35280 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -36,6 +36,7 @@ import ( "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/utils/tsoutil" "github.com/tikv/pd/server/apiv2/handlers" + "github.com/tikv/pd/server/config" "github.com/tikv/pd/tests" "github.com/tikv/pd/tests/integrations/mcs" handlersutil "github.com/tikv/pd/tests/server/apiv2/handlers" @@ -465,3 +466,106 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMembers() { re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/skipSplitRegion")) re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes")) } + +func TestTwiceSplitKeyspaceGroup(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`)) + + // Init api server config but not start. + tc, err := tests.NewTestAPICluster(ctx, 1, func(conf *config.Config, serverName string) { + conf.Keyspace.PreAlloc = []string{ + "keyspace_a", "keyspace_b", + } + }) + re.NoError(err) + pdAddr := tc.GetConfig().GetClientURL() + + // Start pd client and wait pd server start. + var clients sync.Map + go func() { + apiCtx := pd.NewAPIContextV2("keyspace_b") // its keyspace id is 2. + cli, err := pd.NewClientWithAPIContext(ctx, apiCtx, []string{pdAddr}, pd.SecurityOption{}) + re.NoError(err) + clients.Store("keyspace_b", cli) + }() + go func() { + apiCtx := pd.NewAPIContextV2("keyspace_a") // its keyspace id is 1. + cli, err := pd.NewClientWithAPIContext(ctx, apiCtx, []string{pdAddr}, pd.SecurityOption{}) + re.NoError(err) + clients.Store("keyspace_a", cli) + }() + + // Start api server and tso server. + err = tc.RunInitialServers() + re.NoError(err) + defer tc.Destroy() + tc.WaitLeader() + leaderServer := tc.GetServer(tc.GetLeader()) + re.NoError(leaderServer.BootstrapCluster()) + + tsoCluster, err := mcs.NewTestTSOCluster(ctx, 2, pdAddr) + re.NoError(err) + defer tsoCluster.Destroy() + tsoCluster.WaitForDefaultPrimaryServing(re) + + // Wait pd clients are ready. + testutil.Eventually(re, func() bool { + count := 0 + clients.Range(func(key, value interface{}) bool { + count++ + return true + }) + return count == 2 + }) + clientA, ok := clients.Load("keyspace_a") + re.True(ok) + clientB, ok := clients.Load("keyspace_b") + re.True(ok) + + // First split keyspace group 0 to 1 with keyspace 2. + kgm := leaderServer.GetServer().GetKeyspaceGroupManager() + re.NotNil(kgm) + testutil.Eventually(re, func() bool { + err = kgm.SplitKeyspaceGroupByID(0, 1, []uint32{2}) + return err == nil + }) + + // Trigger checkTSOSplit to ensure the split is finished. + testutil.Eventually(re, func() bool { + _, _, err = clientB.(pd.Client).GetTS(ctx) + re.NoError(err) + kg := handlersutil.MustLoadKeyspaceGroupByID(re, leaderServer, 0) + return !kg.IsSplitting() + }) + clientB.(pd.Client).Close() + + // Then split keyspace group 0 to 2 with keyspace 1. + testutil.Eventually(re, func() bool { + err = kgm.SplitKeyspaceGroupByID(0, 2, []uint32{1}) + return err == nil + }) + + // Trigger checkTSOSplit to ensure the split is finished. + testutil.Eventually(re, func() bool { + _, _, err = clientA.(pd.Client).GetTS(ctx) + re.NoError(err) + kg := handlersutil.MustLoadKeyspaceGroupByID(re, leaderServer, 0) + return !kg.IsSplitting() + }) + clientA.(pd.Client).Close() + + // Check the keyspace group 0 is split to 1 and 2. + kg0 := handlersutil.MustLoadKeyspaceGroupByID(re, leaderServer, 0) + kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, leaderServer, 1) + kg2 := handlersutil.MustLoadKeyspaceGroupByID(re, leaderServer, 2) + re.Equal([]uint32{0}, kg0.Keyspaces) + re.Equal([]uint32{2}, kg1.Keyspaces) + re.Equal([]uint32{1}, kg2.Keyspaces) + re.False(kg0.IsSplitting()) + re.False(kg1.IsSplitting()) + re.False(kg2.IsSplitting()) + + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes")) +}