Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

operator: make additional information thread safe #8104

Merged
merged 1 commit into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/schedule/operator/create_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ func CreateSplitRegionOperator(desc string, region *core.RegionInfo, kind OpKind
brief += fmt.Sprintf(" and keys %v", hexKeys)
}
op := NewOperator(desc, brief, region.GetID(), region.GetRegionEpoch(), kind|OpSplit, region.GetApproximateSize(), step)
op.AdditionalInfos["region-start-key"] = core.HexRegionKeyStr(logutil.RedactBytes(region.GetStartKey()))
op.AdditionalInfos["region-end-key"] = core.HexRegionKeyStr(logutil.RedactBytes(region.GetEndKey()))
op.SetAdditionalInfo("region-start-key", core.HexRegionKeyStr(logutil.RedactBytes(region.GetStartKey())))
op.SetAdditionalInfo("region-end-key", core.HexRegionKeyStr(logutil.RedactBytes(region.GetEndKey())))
return op, nil
}

Expand Down
46 changes: 19 additions & 27 deletions pkg/schedule/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package operator

import (
"encoding/json"
"fmt"
"reflect"
"strconv"
Expand Down Expand Up @@ -83,7 +82,7 @@ type Operator struct {
level constant.PriorityLevel
Counters []prometheus.Counter
FinishedCounters []prometheus.Counter
AdditionalInfos map[string]string
additionalInfos opAdditionalInfo
ApproximateSize int64
timeout time.Duration
influence *OpInfluence
Expand All @@ -100,16 +99,18 @@ func NewOperator(desc, brief string, regionID uint64, regionEpoch *metapb.Region
maxDuration += v.Timeout(approximateSize).Seconds()
}
return &Operator{
desc: desc,
brief: brief,
regionID: regionID,
regionEpoch: regionEpoch,
kind: kind,
steps: steps,
stepsTime: make([]int64, len(steps)),
status: NewOpStatusTracker(),
level: level,
AdditionalInfos: make(map[string]string),
desc: desc,
brief: brief,
regionID: regionID,
regionEpoch: regionEpoch,
kind: kind,
steps: steps,
stepsTime: make([]int64, len(steps)),
status: NewOpStatusTracker(),
level: level,
additionalInfos: opAdditionalInfo{
value: make(map[string]string),
},
ApproximateSize: approximateSize,
timeout: time.Duration(maxDuration) * time.Second,
}
Expand All @@ -118,8 +119,8 @@ func NewOperator(desc, brief string, regionID uint64, regionEpoch *metapb.Region
// Sync some attribute with the given timeout.
func (o *Operator) Sync(other *Operator) {
o.timeout = other.timeout
o.AdditionalInfos[string(RelatedMergeRegion)] = strconv.FormatUint(other.RegionID(), 10)
other.AdditionalInfos[string(RelatedMergeRegion)] = strconv.FormatUint(o.RegionID(), 10)
o.SetAdditionalInfo(string(RelatedMergeRegion), strconv.FormatUint(other.RegionID(), 10))
other.SetAdditionalInfo(string(RelatedMergeRegion), strconv.FormatUint(o.RegionID(), 10))
}

func (o *Operator) String() string {
Expand Down Expand Up @@ -297,8 +298,10 @@ func (o *Operator) CheckSuccess() bool {

// Cancel marks the operator canceled.
func (o *Operator) Cancel(reason ...CancelReasonType) bool {
if _, ok := o.AdditionalInfos[cancelReason]; !ok && len(reason) != 0 {
o.AdditionalInfos[cancelReason] = string(reason[0])
o.additionalInfos.Lock()
defer o.additionalInfos.Unlock()
if _, ok := o.additionalInfos.value[cancelReason]; !ok && len(reason) != 0 {
o.additionalInfos.value[cancelReason] = string(reason[0])
}
return o.status.To(CANCELED)
}
Expand Down Expand Up @@ -507,17 +510,6 @@ func (o *Operator) Record(finishTime time.Time) *OpRecord {
return record
}

// GetAdditionalInfo returns additional info with string
func (o *Operator) GetAdditionalInfo() string {
if len(o.AdditionalInfos) != 0 {
additionalInfo, err := json.Marshal(o.AdditionalInfos)
if err == nil {
return string(additionalInfo)
}
}
return ""
}

// IsLeaveJointStateOperator returns true if the desc is OpDescLeaveJointState.
func (o *Operator) IsLeaveJointStateOperator() bool {
return strings.EqualFold(o.desc, OpDescLeaveJointState)
Expand Down
14 changes: 7 additions & 7 deletions pkg/schedule/operator/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ func (oc *Controller) addOperatorInner(op *Operator) bool {
log.Info("add operator",
zap.Uint64("region-id", regionID),
zap.Reflect("operator", op),
zap.String("additional-info", op.GetAdditionalInfo()))
zap.String("additional-info", op.LogAdditionalInfo()))

// If there is an old operator, replace it. The priority should be checked
// already.
Expand Down Expand Up @@ -657,7 +657,7 @@ func (oc *Controller) removeOperatorInner(op *Operator) bool {
}

func (oc *Controller) removeRelatedMergeOperator(op *Operator) {
relatedID, _ := strconv.ParseUint(op.AdditionalInfos[string(RelatedMergeRegion)], 10, 64)
relatedID, _ := strconv.ParseUint(op.GetAdditionalInfo(string(RelatedMergeRegion)), 10, 64)
relatedOpi, ok := oc.operators.Load(relatedID)
if !ok {
return
Expand All @@ -666,7 +666,7 @@ func (oc *Controller) removeRelatedMergeOperator(op *Operator) {
if relatedOp != nil && relatedOp.Status() != CANCELED {
log.Info("operator canceled related merge region",
zap.Uint64("region-id", relatedOp.RegionID()),
zap.String("additional-info", relatedOp.GetAdditionalInfo()),
zap.String("additional-info", relatedOp.LogAdditionalInfo()),
zap.Duration("takes", relatedOp.RunningTime()))
oc.removeOperatorInner(relatedOp)
relatedOp.Cancel(RelatedMergeRegion)
Expand Down Expand Up @@ -695,7 +695,7 @@ func (oc *Controller) buryOperator(op *Operator) {
zap.Uint64("region-id", op.RegionID()),
zap.Duration("takes", op.RunningTime()),
zap.Reflect("operator", op),
zap.String("additional-info", op.GetAdditionalInfo()))
zap.String("additional-info", op.LogAdditionalInfo()))
operatorCounter.WithLabelValues(op.Desc(), "finish").Inc()
operatorDuration.WithLabelValues(op.Desc()).Observe(op.RunningTime().Seconds())
for _, counter := range op.FinishedCounters {
Expand All @@ -706,7 +706,7 @@ func (oc *Controller) buryOperator(op *Operator) {
zap.Uint64("region-id", op.RegionID()),
zap.Duration("takes", op.RunningTime()),
zap.Reflect("operator", op),
zap.String("additional-info", op.GetAdditionalInfo()))
zap.String("additional-info", op.LogAdditionalInfo()))
operatorCounter.WithLabelValues(op.Desc(), "replace").Inc()
case EXPIRED:
log.Info("operator expired",
Expand All @@ -719,14 +719,14 @@ func (oc *Controller) buryOperator(op *Operator) {
zap.Uint64("region-id", op.RegionID()),
zap.Duration("takes", op.RunningTime()),
zap.Reflect("operator", op),
zap.String("additional-info", op.GetAdditionalInfo()))
zap.String("additional-info", op.LogAdditionalInfo()))
operatorCounter.WithLabelValues(op.Desc(), "timeout").Inc()
case CANCELED:
log.Info("operator canceled",
zap.Uint64("region-id", op.RegionID()),
zap.Duration("takes", op.RunningTime()),
zap.Reflect("operator", op),
zap.String("additional-info", op.GetAdditionalInfo()),
zap.String("additional-info", op.LogAdditionalInfo()),
)
operatorCounter.WithLabelValues(op.Desc(), "cancel").Inc()
}
Expand Down
33 changes: 33 additions & 0 deletions pkg/schedule/operator/status_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package operator

import (
"encoding/json"
"time"

"github.com/tikv/pd/pkg/utils/syncutil"
Expand Down Expand Up @@ -135,3 +136,35 @@ func (trk *OpStatusTracker) String() string {
defer trk.rw.RUnlock()
return OpStatusToString(trk.current)
}

type opAdditionalInfo struct {
syncutil.RWMutex
value map[string]string
}

// SetAdditionalInfo sets additional info with key and value.
func (o *Operator) SetAdditionalInfo(key string, value string) {
o.additionalInfos.Lock()
defer o.additionalInfos.Unlock()
o.additionalInfos.value[key] = value
}

// GetAdditionalInfo returns additional info with key.
func (o *Operator) GetAdditionalInfo(key string) string {
o.additionalInfos.RLock()
defer o.additionalInfos.RUnlock()
return o.additionalInfos.value[key]
}

// LogAdditionalInfo returns additional info with string
func (o *Operator) LogAdditionalInfo() string {
o.additionalInfos.RLock()
defer o.additionalInfos.RUnlock()
if len(o.additionalInfos.value) != 0 {
additionalInfo, err := json.Marshal(o.additionalInfos.value)
if err == nil {
return string(additionalInfo)
}
}
return ""
}
25 changes: 25 additions & 0 deletions pkg/schedule/operator/status_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package operator

import (
"fmt"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -178,3 +180,26 @@ func checkReachTime(re *require.Assertions, trk *OpStatusTracker, reached ...OpS
re.True(trk.ReachTimeOf(st).IsZero())
}
}

func TestAdditionalInfoConcurrent(t *testing.T) {
op := NewOperator("test", "test", 0, nil, OpAdmin, 0)

var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
key := fmt.Sprintf("key%d", i)
value := fmt.Sprintf("value%d", i)
op.SetAdditionalInfo(key, value)
if op.GetAdditionalInfo(key) != value {
t.Errorf("unexpected value for key %s", key)
}
}(i)
}
wg.Wait()

if logInfo := op.LogAdditionalInfo(); logInfo == "" {
t.Error("LogAdditionalInfo returned an empty string")
}
}
4 changes: 2 additions & 2 deletions pkg/schedule/scatter/region_scatterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,8 +399,8 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string, s
if op != nil {
scatterSuccessCounter.Inc()
r.Put(targetPeers, targetLeader, group)
op.AdditionalInfos["group"] = group
op.AdditionalInfos["leader-picked-count"] = strconv.FormatUint(leaderStorePickedCount, 10)
op.SetAdditionalInfo("group", group)
op.SetAdditionalInfo("leader-picked-count", strconv.FormatUint(leaderStorePickedCount, 10))
op.SetPriorityLevel(constant.High)
}
return op, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/scatter/region_scatterer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,7 @@ func TestSelectedStoresTooFewPeers(t *testing.T) {
re.NoError(err)
re.False(isPeerCountChanged(op))
if op != nil {
re.Equal(group, op.AdditionalInfos["group"])
re.Equal(group, op.GetAdditionalInfo("group"))
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ func (l *balanceLeaderScheduler) createOperator(solver *solver, collector *plan.
op.FinishedCounters = append(op.FinishedCounters,
balanceDirectionCounter.WithLabelValues(l.GetName(), solver.SourceMetricLabel(), solver.TargetMetricLabel()),
)
op.AdditionalInfos["sourceScore"] = strconv.FormatFloat(solver.sourceScore, 'f', 2, 64)
op.AdditionalInfos["targetScore"] = strconv.FormatFloat(solver.targetScore, 'f', 2, 64)
op.SetAdditionalInfo("sourceScore", strconv.FormatFloat(solver.sourceScore, 'f', 2, 64))
op.SetAdditionalInfo("targetScore", strconv.FormatFloat(solver.targetScore, 'f', 2, 64))
return op
}
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,8 @@ func (s *balanceRegionScheduler) transferPeer(solver *solver, collector *plan.Co
op.FinishedCounters = append(op.FinishedCounters,
balanceDirectionCounter.WithLabelValues(s.GetName(), sourceLabel, targetLabel),
)
op.AdditionalInfos["sourceScore"] = strconv.FormatFloat(solver.sourceScore, 'f', 2, 64)
op.AdditionalInfos["targetScore"] = strconv.FormatFloat(solver.targetScore, 'f', 2, 64)
op.SetAdditionalInfo("sourceScore", strconv.FormatFloat(solver.sourceScore, 'f', 2, 64))
op.SetAdditionalInfo("targetScore", strconv.FormatFloat(solver.targetScore, 'f', 2, 64))
return op
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/balance_witness.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ func (b *balanceWitnessScheduler) createOperator(solver *solver, collector *plan
b.counter.WithLabelValues("move-witness", solver.SourceMetricLabel()+"-out"),
b.counter.WithLabelValues("move-witness", solver.TargetMetricLabel()+"-in"),
)
op.AdditionalInfos["sourceScore"] = strconv.FormatFloat(solver.sourceScore, 'f', 2, 64)
op.AdditionalInfos["targetScore"] = strconv.FormatFloat(solver.targetScore, 'f', 2, 64)
op.SetAdditionalInfo("sourceScore", strconv.FormatFloat(solver.sourceScore, 'f', 2, 64))
op.SetAdditionalInfo("targetScore", strconv.FormatFloat(solver.targetScore, 'f', 2, 64))
return op
}
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -1653,8 +1653,8 @@ func (bs *balanceSolver) splitBucketsByLoad(region *core.RegionInfo, bucketStats
}
op := bs.splitBucketsOperator(region, [][]byte{splitKey})
if op != nil {
op.AdditionalInfos["accLoads"] = strconv.FormatUint(acc-stats[splitIdx-1].Loads[dim], 10)
op.AdditionalInfos["totalLoads"] = strconv.FormatUint(totalLoads, 10)
op.SetAdditionalInfo("accLoads", strconv.FormatUint(acc-stats[splitIdx-1].Loads[dim], 10))
op.SetAdditionalInfo("totalLoads", strconv.FormatUint(totalLoads, 10))
}
return op
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/schedulers/split_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func (s *splitBucketScheduler) splitBucket(plan *splitBucketPlan) []*operator.Op
return nil
}
splitBucketNewOperatorCounter.Inc()
op.AdditionalInfos["hot-degree"] = strconv.FormatInt(int64(splitBucket.HotDegree), 10)
op.SetAdditionalInfo("hot-degree", strconv.FormatInt(int64(splitBucket.HotDegree), 10))
return []*operator.Operator{op}
}
return nil
Expand Down