Skip to content

Commit

Permalink
*: fix AtomicCheckAndPutRegion deadlock (#5758) (#5759)
Browse files Browse the repository at this point in the history
ref #5755, close #5757, ref #5758

Signed-off-by: Ryan Leung <rleungx@gmail.com>

Co-authored-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
ti-chi-bot and rleungx authored Dec 5, 2022
1 parent 5d6c912 commit d1a4433
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 2 deletions.
3 changes: 3 additions & 0 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,9 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {

var overlaps []*core.RegionInfo
if saveCache {
failpoint.Inject("decEpoch", func() {
region = region.Clone(core.SetRegionConfVer(2), core.SetRegionVersion(2))
})
// To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one,
// check its validation again here.
//
Expand Down
1 change: 1 addition & 0 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,7 @@ func (r *RegionsInfo) AtomicCheckAndPutRegion(region *RegionInfo) ([]*RegionInfo
}
_, err := check(region, origin, overlaps)
if err != nil {
r.t.Unlock()
return nil, err
}
origin, overlaps, rangeChanged := r.setRegionLocked(region)
Expand Down
47 changes: 45 additions & 2 deletions tests/server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/replication_modepb"
"github.com/sasha-s/go-deadlock"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/dashboard"
"github.com/tikv/pd/pkg/mock/mockid"
Expand Down Expand Up @@ -179,6 +178,51 @@ func TestDamagedRegion(t *testing.T) {
re.Equal(uint64(1), rc.GetOperatorController().OperatorCount(operator.OpAdmin))
}

func TestStaleRegion(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tc, err := tests.NewTestCluster(ctx, 1)
defer tc.Destroy()
re.NoError(err)

err = tc.RunInitialServers()
re.NoError(err)

tc.WaitLeader()
leaderServer := tc.GetServer(tc.GetLeader())
grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr())
clusterID := leaderServer.GetClusterID()
bootstrapCluster(re, clusterID, grpcPDClient)

region := &metapb.Region{
Id: 10,
StartKey: []byte("abc"),
EndKey: []byte("xyz"),
Peers: []*metapb.Peer{
{Id: 101, StoreId: 1},
{Id: 102, StoreId: 2},
{Id: 103, StoreId: 3},
},
RegionEpoch: &metapb.RegionEpoch{
ConfVer: 10,
Version: 10,
},
}

// To put region.
regionInfoA := core.NewRegionInfo(region, region.Peers[0], core.SetApproximateSize(30))
err = tc.HandleRegionHeartbeat(regionInfoA)
re.NoError(err)
regionInfoA = regionInfoA.Clone(core.WithIncConfVer(), core.WithIncVersion())
re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/decEpoch", `return(true)`))
tc.HandleRegionHeartbeat(regionInfoA)
re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/decEpoch"))
regionInfoA = regionInfoA.Clone(core.WithIncConfVer(), core.WithIncVersion())
err = tc.HandleRegionHeartbeat(regionInfoA)
re.NoError(err)
}

func TestGetPutConfig(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -566,7 +610,6 @@ func TestStoreVersionChange(t *testing.T) {

func TestConcurrentHandleRegion(t *testing.T) {
re := require.New(t)
deadlock.Opts.DeadlockTimeout = time.Minute
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dashboard.SetCheckInterval(30 * time.Minute)
Expand Down

0 comments on commit d1a4433

Please sign in to comment.