From b39a544f0cf6ac284daa58605bdc62d8b7c06b59 Mon Sep 17 00:00:00 2001 From: Bin Shi <39923490+binshi-bing@users.noreply.github.com> Date: Tue, 13 Jun 2023 21:09:08 -0700 Subject: [PATCH] mcs, tso: fix potential inconsistency caused by non-atomic applying keyspace movement state change in the persistent store (#6596) ref tikv/pd#5895 fix potential inconsistency caused by non-atomic applying the state change in the persistent in the following cases: 1. Keyspace group split/merge 2. Keyspace movement across keyspace groups. Signed-off-by: Bin Shi --- pkg/tso/keyspace_group_manager.go | 12 +++- pkg/tso/keyspace_group_manager_test.go | 76 +++++++++++++++++++++++++- 2 files changed, 84 insertions(+), 4 deletions(-) diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 9eaea2bf48e..3159518387f 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -563,7 +563,14 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroupMembership( i++ j++ } else if i < oldLen && j < newLen && oldKeyspaces[i] < newKeyspaces[j] || j == newLen { - delete(kgm.keyspaceLookupTable, oldKeyspaces[i]) + // kgm.keyspaceLookupTable is a global lookup table for all keyspace groups, storing the + // keyspace group ID for each keyspace. If the keyspace group of this keyspace in this + // lookup table isn't the current keyspace group, it means the keyspace has been moved + // to another keyspace group which has already declared the ownership of the keyspace, + // and we shouldn't delete and overwrite the ownership. + if curGroupID, ok := kgm.keyspaceLookupTable[oldKeyspaces[i]]; ok && curGroupID == groupID { + delete(kgm.keyspaceLookupTable, oldKeyspaces[i]) + } i++ } else { newGroup.KeyspaceLookupTable[newKeyspaces[j]] = struct{}{} @@ -621,7 +628,8 @@ func (kgm *KeyspaceGroupManager) deleteKeyspaceGroup(groupID uint32) { // if kid == kg.ID, it means the keyspace still belongs to this keyspace group, // so we decouple the relationship in the global keyspace lookup table. // if kid != kg.ID, it means the keyspace has been moved to another keyspace group - // which has already declared the ownership of the keyspace. + // which has already declared the ownership of the keyspace, so we don't need + // delete it from the global keyspace lookup table and overwrite the ownership. if kid == kg.ID { delete(kgm.keyspaceLookupTable, kid) } diff --git a/pkg/tso/keyspace_group_manager_test.go b/pkg/tso/keyspace_group_manager_test.go index 7817057aaec..1e7d072ade3 100644 --- a/pkg/tso/keyspace_group_manager_test.go +++ b/pkg/tso/keyspace_group_manager_test.go @@ -490,8 +490,8 @@ func (suite *keyspaceGroupManagerTestSuite) TestDefaultMembershipRestriction() { err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg) re.NoError(err) - // Sleep for a while to wait for the events to propagate. If the restriction is not working, - // it will cause random failure. + // Sleep for a while to wait for the events to propagate. If the logic doesn't work + // as expected, it will cause random failure. time.Sleep(1 * time.Second) // Should still be able to get AM for keyspace 0 in keyspace group 0. am, kg, kgid, err = mgr.getKeyspaceGroupMetaWithCheck( @@ -508,6 +508,78 @@ func (suite *keyspaceGroupManagerTestSuite) TestDefaultMembershipRestriction() { re.NotNil(kg) } +// TestKeyspaceMovementConsistency tests the consistency of keyspace movement. +// When a keyspace is moved from one keyspace group to another, the allocator manager +// update source group and target group state in etcd atomically. The TSO keyspace group +// manager watches the state change in persistent store but hard to apply the movement state +// change across two groups atomically. This test case is to test the movement state is +// eventually consistent, for example, if a keyspace "move to group B" event is applied +// before "move away from group A" event, the second event shouldn't overwrite the global +// state, such as the global keyspace group lookup table. +func (suite *keyspaceGroupManagerTestSuite) TestKeyspaceMovementConsistency() { + re := suite.Require() + + mgr := suite.newUniqueKeyspaceGroupManager(1) + re.NotNil(mgr) + defer mgr.Close() + + rootPath := mgr.legacySvcRootPath + svcAddr := mgr.tsoServiceID.ServiceAddr + + var ( + am *AllocatorManager + kg *endpoint.KeyspaceGroup + kgid uint32 + err error + event *etcdEvent + ) + + // Create keyspace group 0 which contains keyspace 0, 1, 2. + addKeyspaceGroupAssignment( + suite.ctx, suite.etcdClient, true, + mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, + mcsutils.DefaultKeyspaceGroupID, []uint32{mcsutils.DefaultKeyspaceID, 10, 20}) + // Create keyspace group 1 which contains keyspace 3, 4. + addKeyspaceGroupAssignment( + suite.ctx, suite.etcdClient, true, + mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, + uint32(1), []uint32{11, 21}) + + err = mgr.Initialize() + re.NoError(err) + + // Should be able to get AM for keyspace 10 in keyspace group 0. + am, kg, kgid, err = mgr.getKeyspaceGroupMetaWithCheck(10, mcsutils.DefaultKeyspaceGroupID) + re.NoError(err) + re.Equal(mcsutils.DefaultKeyspaceGroupID, kgid) + re.NotNil(am) + re.NotNil(kg) + + // Move keyspace 10 from keyspace group 0 to keyspace group 1 and apply this state change + // to TSO first. + event = generateKeyspaceGroupPutEvent(1, []uint32{10, 11, 21}, []string{svcAddr}) + err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg) + re.NoError(err) + // Wait until the keyspace 10 is served by keyspace group 1. + testutil.Eventually(re, func() bool { + _, _, kgid, err = mgr.getKeyspaceGroupMetaWithCheck(10, 1) + return err == nil && kgid == 1 + }, testutil.WithWaitFor(3*time.Second), testutil.WithTickInterval(50*time.Millisecond)) + + event = generateKeyspaceGroupPutEvent( + mcsutils.DefaultKeyspaceGroupID, []uint32{mcsutils.DefaultKeyspaceID, 20}, []string{svcAddr}) + err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg) + re.NoError(err) + + // Sleep for a while to wait for the events to propagate. If the restriction is not working, + // it will cause random failure. + time.Sleep(1 * time.Second) + // Should still be able to get AM for keyspace 10 in keyspace group 1. + _, _, kgid, err = mgr.getKeyspaceGroupMetaWithCheck(10, 1) + re.NoError(err) + re.Equal(uint32(1), kgid) +} + // TestHandleTSORequestWithWrongMembership tests the case that HandleTSORequest receives // a tso request with mismatched keyspace and keyspace group. func (suite *keyspaceGroupManagerTestSuite) TestHandleTSORequestWithWrongMembership() {