From c24f173ce43532aa776bef0558469881415b084b Mon Sep 17 00:00:00 2001 From: husharp Date: Tue, 27 Aug 2024 16:35:43 +0800 Subject: [PATCH] replace Signed-off-by: husharp --- server/schedule/operator/operator.go | 5 ++- server/schedule/operator/operator_test.go | 53 +++++++++++++++++++++++ 2 files changed, 56 insertions(+), 2 deletions(-) diff --git a/server/schedule/operator/operator.go b/server/schedule/operator/operator.go index 0e760846e64..d4dfb5f929e 100644 --- a/server/schedule/operator/operator.go +++ b/server/schedule/operator/operator.go @@ -299,10 +299,11 @@ func (o *Operator) Check(region *core.RegionInfo) OpStep { defer func() { _ = o.CheckTimeout() }() for step := atomic.LoadInt32(&o.currentStep); int(step) < len(o.steps); step++ { if o.steps[int(step)].IsFinish(region) { - if atomic.CompareAndSwapInt64(&(o.stepsTime[step]), 0, time.Now().UnixNano()) { + current := time.Now() + if atomic.CompareAndSwapInt64(&(o.stepsTime[step]), 0, current.UnixNano()) { startTime, _ := o.getCurrentTimeAndStep() operatorStepDuration.WithLabelValues(reflect.TypeOf(o.steps[int(step)]).Name()). - Observe(time.Unix(0, o.stepsTime[step]).Sub(startTime).Seconds()) + Observe(current.Sub(startTime).Seconds()) } atomic.StoreInt32(&o.currentStep, step+1) } else { diff --git a/server/schedule/operator/operator_test.go b/server/schedule/operator/operator_test.go index 87e44e7a6c6..0fa2361c75a 100644 --- a/server/schedule/operator/operator_test.go +++ b/server/schedule/operator/operator_test.go @@ -18,11 +18,13 @@ import ( "context" "encoding/json" "fmt" + "sync" "sync/atomic" "testing" "time" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/mock/mockcluster" "github.com/tikv/pd/server/config" @@ -487,3 +489,54 @@ func (suite *operatorTestSuite) TestRecord() { suite.Equal(now, ob.FinishTime) suite.Greater(ob.duration.Seconds(), time.Second.Seconds()) } + +func TestOperatorCheckConcurrently(t *testing.T) { + re := require.New(t) + region := newTestRegion(1, 1, [2]uint64{1, 1}, [2]uint64{2, 2}) + // addPeer1, transferLeader1, removePeer3 + steps := []OpStep{ + AddPeer{ToStore: 1, PeerID: 1}, + TransferLeader{FromStore: 3, ToStore: 1}, + RemovePeer{FromStore: 3}, + } + op := NewTestOperator(1, &metapb.RegionEpoch{}, OpAdmin|OpLeader|OpRegion, steps...) + re.Equal(core.Urgent, op.GetPriorityLevel()) + checkSteps(re, op, steps) + op.Start() + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + re.Nil(op.Check(region)) + }() + } + wg.Wait() +} + +func newTestRegion(regionID uint64, leaderPeer uint64, peers ...[2]uint64) *core.RegionInfo { + var ( + region metapb.Region + leader *metapb.Peer + ) + region.Id = regionID + for i := range peers { + peer := &metapb.Peer{ + Id: peers[i][1], + StoreId: peers[i][0], + } + region.Peers = append(region.Peers, peer) + if peer.GetId() == leaderPeer { + leader = peer + } + } + regionInfo := core.NewRegionInfo(®ion, leader, core.SetApproximateSize(50), core.SetApproximateKeys(50)) + return regionInfo +} + +func checkSteps(re *require.Assertions, op *Operator, steps []OpStep) { + re.Len(steps, op.Len()) + for i := range steps { + re.Equal(steps[i], op.Step(i)) + } +}