diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 32c8eefc284..fdb56c529ce 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -865,6 +865,9 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { var overlaps []*core.RegionInfo if saveCache { + failpoint.Inject("decEpoch", func() { + region = region.Clone(core.SetRegionConfVer(2), core.SetRegionVersion(2)) + }) // To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one, // check its validation again here. // diff --git a/server/core/region.go b/server/core/region.go index 03da953ba60..48114bdb951 100644 --- a/server/core/region.go +++ b/server/core/region.go @@ -751,6 +751,7 @@ func (r *RegionsInfo) AtomicCheckAndPutRegion(region *RegionInfo) ([]*RegionInfo } _, err := check(region, origin, overlaps) if err != nil { + r.t.Unlock() return nil, err } origin, overlaps, rangeChanged := r.setRegionLocked(region) diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index 80e4da73704..c4614e27b51 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -29,7 +29,6 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/kvproto/pkg/replication_modepb" - "github.com/sasha-s/go-deadlock" "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/dashboard" "github.com/tikv/pd/pkg/mock/mockid" @@ -179,6 +178,51 @@ func TestDamagedRegion(t *testing.T) { re.Equal(uint64(1), rc.GetOperatorController().OperatorCount(operator.OpAdmin)) } +func TestStaleRegion(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + tc, err := tests.NewTestCluster(ctx, 1) + defer tc.Destroy() + re.NoError(err) + + err = tc.RunInitialServers() + re.NoError(err) + + tc.WaitLeader() + leaderServer := tc.GetServer(tc.GetLeader()) + grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) + clusterID := leaderServer.GetClusterID() + bootstrapCluster(re, clusterID, grpcPDClient) + + region := &metapb.Region{ + Id: 10, + StartKey: []byte("abc"), + EndKey: []byte("xyz"), + Peers: []*metapb.Peer{ + {Id: 101, StoreId: 1}, + {Id: 102, StoreId: 2}, + {Id: 103, StoreId: 3}, + }, + RegionEpoch: &metapb.RegionEpoch{ + ConfVer: 10, + Version: 10, + }, + } + + // To put region. + regionInfoA := core.NewRegionInfo(region, region.Peers[0], core.SetApproximateSize(30)) + err = tc.HandleRegionHeartbeat(regionInfoA) + re.NoError(err) + regionInfoA = regionInfoA.Clone(core.WithIncConfVer(), core.WithIncVersion()) + re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/decEpoch", `return(true)`)) + tc.HandleRegionHeartbeat(regionInfoA) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/decEpoch")) + regionInfoA = regionInfoA.Clone(core.WithIncConfVer(), core.WithIncVersion()) + err = tc.HandleRegionHeartbeat(regionInfoA) + re.NoError(err) +} + func TestGetPutConfig(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) @@ -566,7 +610,6 @@ func TestStoreVersionChange(t *testing.T) { func TestConcurrentHandleRegion(t *testing.T) { re := require.New(t) - deadlock.Opts.DeadlockTimeout = time.Minute ctx, cancel := context.WithCancel(context.Background()) defer cancel() dashboard.SetCheckInterval(30 * time.Minute)