Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs, tso: fix potential inconsistency caused by non-atomic applying keyspace movement state change in the persistent store #6596

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,14 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroupMembership(
i++
j++
} else if i < oldLen && j < newLen && oldKeyspaces[i] < newKeyspaces[j] || j == newLen {
delete(kgm.keyspaceLookupTable, oldKeyspaces[i])
// kgm.keyspaceLookupTable is a global lookup table for all keyspace groups, storing the
// keyspace group ID for each keyspace. If the keyspace group of this keyspace in this
// lookup table isn't the current keyspace group, it means the keyspace has been moved
// to another keyspace group which has already declared the ownership of the keyspace,
// and we shouldn't delete and overwrite the ownership.
if curGroupID, ok := kgm.keyspaceLookupTable[oldKeyspaces[i]]; ok && curGroupID == groupID {
delete(kgm.keyspaceLookupTable, oldKeyspaces[i])
}
i++
} else {
newGroup.KeyspaceLookupTable[newKeyspaces[j]] = struct{}{}
Expand Down Expand Up @@ -621,7 +628,8 @@ func (kgm *KeyspaceGroupManager) deleteKeyspaceGroup(groupID uint32) {
// if kid == kg.ID, it means the keyspace still belongs to this keyspace group,
// so we decouple the relationship in the global keyspace lookup table.
// if kid != kg.ID, it means the keyspace has been moved to another keyspace group
// which has already declared the ownership of the keyspace.
// which has already declared the ownership of the keyspace, so we don't need
// delete it from the global keyspace lookup table and overwrite the ownership.
if kid == kg.ID {
delete(kgm.keyspaceLookupTable, kid)
}
Expand Down
76 changes: 74 additions & 2 deletions pkg/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,8 +490,8 @@ func (suite *keyspaceGroupManagerTestSuite) TestDefaultMembershipRestriction() {
err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg)
re.NoError(err)

// Sleep for a while to wait for the events to propagate. If the restriction is not working,
// it will cause random failure.
// Sleep for a while to wait for the events to propagate. If the logic doesn't work
// as expected, it will cause random failure.
time.Sleep(1 * time.Second)
// Should still be able to get AM for keyspace 0 in keyspace group 0.
am, kg, kgid, err = mgr.getKeyspaceGroupMetaWithCheck(
Expand All @@ -508,6 +508,78 @@ func (suite *keyspaceGroupManagerTestSuite) TestDefaultMembershipRestriction() {
re.NotNil(kg)
}

// TestKeyspaceMovementConsistency tests the consistency of keyspace movement.
// When a keyspace is moved from one keyspace group to another, the allocator manager
// update source group and target group state in etcd atomically. The TSO keyspace group
// manager watches the state change in persistent store but hard to apply the movement state
// change across two groups atomically. This test case is to test the movement state is
// eventually consistent, for example, if a keyspace "move to group B" event is applied
// before "move away from group A" event, the second event shouldn't overwrite the global
// state, such as the global keyspace group lookup table.
func (suite *keyspaceGroupManagerTestSuite) TestKeyspaceMovementConsistency() {
re := suite.Require()

mgr := suite.newUniqueKeyspaceGroupManager(1)
re.NotNil(mgr)
defer mgr.Close()

rootPath := mgr.legacySvcRootPath
svcAddr := mgr.tsoServiceID.ServiceAddr

var (
am *AllocatorManager
kg *endpoint.KeyspaceGroup
kgid uint32
err error
event *etcdEvent
)

// Create keyspace group 0 which contains keyspace 0, 1, 2.
addKeyspaceGroupAssignment(
suite.ctx, suite.etcdClient, true,
mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr,
mcsutils.DefaultKeyspaceGroupID, []uint32{mcsutils.DefaultKeyspaceID, 10, 20})
// Create keyspace group 1 which contains keyspace 3, 4.
addKeyspaceGroupAssignment(
suite.ctx, suite.etcdClient, true,
mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr,
uint32(1), []uint32{11, 21})

err = mgr.Initialize()
re.NoError(err)

// Should be able to get AM for keyspace 10 in keyspace group 0.
am, kg, kgid, err = mgr.getKeyspaceGroupMetaWithCheck(10, mcsutils.DefaultKeyspaceGroupID)
re.NoError(err)
re.Equal(mcsutils.DefaultKeyspaceGroupID, kgid)
re.NotNil(am)
re.NotNil(kg)

// Move keyspace 10 from keyspace group 0 to keyspace group 1 and apply this state change
// to TSO first.
event = generateKeyspaceGroupPutEvent(1, []uint32{10, 11, 21}, []string{svcAddr})
err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg)
re.NoError(err)
// Wait until the keyspace 10 is served by keyspace group 1.
testutil.Eventually(re, func() bool {
_, _, kgid, err = mgr.getKeyspaceGroupMetaWithCheck(10, 1)
return err == nil && kgid == 1
}, testutil.WithWaitFor(3*time.Second), testutil.WithTickInterval(50*time.Millisecond))

event = generateKeyspaceGroupPutEvent(
mcsutils.DefaultKeyspaceGroupID, []uint32{mcsutils.DefaultKeyspaceID, 20}, []string{svcAddr})
err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg)
re.NoError(err)

// Sleep for a while to wait for the events to propagate. If the restriction is not working,
// it will cause random failure.
time.Sleep(1 * time.Second)
// Should still be able to get AM for keyspace 10 in keyspace group 1.
_, _, kgid, err = mgr.getKeyspaceGroupMetaWithCheck(10, 1)
re.NoError(err)
re.Equal(uint32(1), kgid)
}

// TestHandleTSORequestWithWrongMembership tests the case that HandleTSORequest receives
// a tso request with mismatched keyspace and keyspace group.
func (suite *keyspaceGroupManagerTestSuite) TestHandleTSORequestWithWrongMembership() {
Expand Down