Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs, schedule: use txn in label rule manager #7738

Merged
merged 7 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 39 additions & 20 deletions pkg/schedule/labeler/labeler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/schedule/rangelist"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"go.uber.org/zap"
Expand Down Expand Up @@ -88,11 +89,12 @@ func (l *RegionLabeler) checkAndClearExpiredLabels() {
continue
}
if len(rule.Labels) == 0 {
err = l.storage.DeleteRegionRule(key)
delete(l.labelRules, key)
deleted = true
err = l.DeleteLabelRuleLocked(key)
if err == nil {
deleted = true
}
} else {
err = l.storage.SaveRegionRule(key, rule)
err = l.SaveLabelRuleLocked(rule)
}
if err != nil {
log.Error("failed to save rule expired label rule", zap.String("rule-key", key), zap.Error(err))
Expand Down Expand Up @@ -123,8 +125,10 @@ func (l *RegionLabeler) loadRules() error {
if err != nil {
return err
}
for _, d := range toDelete {
if err = l.storage.DeleteRegionRule(d); err != nil {
for _, id := range toDelete {
if err := l.storage.RunInTxn(l.ctx, func(txn kv.Txn) error {
return l.storage.DeleteRegionRule(txn, id)
}); err != nil {
return err
}
}
Expand Down Expand Up @@ -197,11 +201,10 @@ func (l *RegionLabeler) getAndCheckRule(id string, now time.Time) *LabelRule {
return rule
}
if len(rule.Labels) == 0 {
l.storage.DeleteRegionRule(id)
delete(l.labelRules, id)
l.DeleteLabelRuleLocked(id)
return nil
}
l.storage.SaveRegionRule(id, rule)
l.SaveLabelRuleLocked(rule)
return rule
}

Expand All @@ -221,17 +224,28 @@ func (l *RegionLabeler) SetLabelRuleLocked(rule *LabelRule) error {
if err := rule.checkAndAdjust(); err != nil {
return err
}
if err := l.storage.SaveRegionRule(rule.ID, rule); err != nil {
if err := l.SaveLabelRuleLocked(rule); err != nil {
return err
}
l.labelRules[rule.ID] = rule
return nil
}

// SaveLabelRuleLocked inserts or updates a LabelRule but not buildRangeList.
// It only saves the rule to storage, and does not update the in-memory states.
func (l *RegionLabeler) SaveLabelRuleLocked(rule *LabelRule) error {
return l.storage.RunInTxn(l.ctx, func(txn kv.Txn) error {
return l.storage.SaveRegionRule(txn, rule.ID, rule)
})
}

// DeleteLabelRule removes a LabelRule.
func (l *RegionLabeler) DeleteLabelRule(id string) error {
l.Lock()
defer l.Unlock()
if _, ok := l.labelRules[id]; !ok {
return errs.ErrRegionRuleNotFound.FastGenByArgs(id)
}
if err := l.DeleteLabelRuleLocked(id); err != nil {
return err
}
Expand All @@ -241,10 +255,9 @@ func (l *RegionLabeler) DeleteLabelRule(id string) error {

// DeleteLabelRuleLocked removes a LabelRule but not buildRangeList.
func (l *RegionLabeler) DeleteLabelRuleLocked(id string) error {
if _, ok := l.labelRules[id]; !ok {
return errs.ErrRegionRuleNotFound.FastGenByArgs(id)
}
if err := l.storage.DeleteRegionRule(id); err != nil {
if err := l.storage.RunInTxn(l.ctx, func(txn kv.Txn) error {
return l.storage.DeleteRegionRule(txn, id)
}); err != nil {
return err
}
delete(l.labelRules, id)
Expand All @@ -260,15 +273,21 @@ func (l *RegionLabeler) Patch(patch LabelRulePatch) error {
}

// save to storage
var batch []func(kv.Txn) error
for _, key := range patch.DeleteRules {
if err := l.storage.DeleteRegionRule(key); err != nil {
return err
}
localKey := key
batch = append(batch, func(txn kv.Txn) error {
return l.storage.DeleteRegionRule(txn, localKey)
})
}
for _, rule := range patch.SetRules {
if err := l.storage.SaveRegionRule(rule.ID, rule); err != nil {
return err
}
localID, localRule := rule.ID, rule
batch = append(batch, func(txn kv.Txn) error {
return l.storage.SaveRegionRule(txn, localID, localRule)
})
}
if err := endpoint.RunBatchOpInTxn(l.ctx, l.storage, batch); err != nil {
return err
}

// update in-memory states.
Expand Down
44 changes: 44 additions & 0 deletions pkg/schedule/labeler/labeler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/json"
"fmt"
"sort"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -132,6 +133,49 @@ func TestGetSetRule(t *testing.T) {
for id, rule := range allRules {
expectSameRules(re, rule, rules[id+1])
}

for _, r := range rules {
labeler.DeleteLabelRule(r.ID)
}
re.Empty(labeler.GetAllLabelRules())

// test patch rules in batch
rulesNum := 200
patch.SetRules = patch.SetRules[:0]
patch.DeleteRules = patch.DeleteRules[:0]
for i := 1; i <= rulesNum; i++ {
patch.SetRules = append(patch.SetRules, &LabelRule{
ID: fmt.Sprintf("rule_%d", i),
Labels: []RegionLabel{
{Key: fmt.Sprintf("k_%d", i), Value: fmt.Sprintf("v_%d", i)},
},
RuleType: "key-range",
Data: MakeKeyRanges("", ""),
})
}
err = labeler.Patch(patch)
re.NoError(err)
allRules = labeler.GetAllLabelRules()
re.Len(allRules, rulesNum)
sort.Slice(allRules, func(i, j int) bool {
i1, err := strconv.Atoi(allRules[i].ID[5:])
re.NoError(err)
j1, err := strconv.Atoi(allRules[j].ID[5:])
re.NoError(err)
return i1 < j1
})
for id, rule := range allRules {
expectSameRules(re, rule, patch.SetRules[id])
}
patch.SetRules = patch.SetRules[:0]
patch.DeleteRules = patch.DeleteRules[:0]
for i := 1; i <= rulesNum; i++ {
patch.DeleteRules = append(patch.DeleteRules, fmt.Sprintf("rule_%d", i))
}
err = labeler.Patch(patch)
re.NoError(err)
allRules = labeler.GetAllLabelRules()
re.Empty(allRules)
}

func TestIndex(t *testing.T) {
Expand Down
27 changes: 2 additions & 25 deletions pkg/schedule/placement/rule_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"go.uber.org/zap"
"golang.org/x/exp/slices"
Expand Down Expand Up @@ -202,7 +201,7 @@ func (m *RuleManager) loadRules() error {
return m.storage.DeleteRule(txn, localKey)
})
}
return m.runBatchOpInTxn(batch)
return endpoint.RunBatchOpInTxn(m.ctx, m.storage, batch)
}

func (m *RuleManager) loadGroups() error {
Expand Down Expand Up @@ -521,7 +520,7 @@ func (m *RuleManager) savePatch(p *ruleConfig) error {
})
}
}
return m.runBatchOpInTxn(batch)
return endpoint.RunBatchOpInTxn(m.ctx, m.storage, batch)
}

// SetRules inserts or updates lots of Rules at once.
Expand Down Expand Up @@ -814,28 +813,6 @@ func (m *RuleManager) IsInitialized() bool {
return m.initialized
}

func (m *RuleManager) runBatchOpInTxn(batch []func(kv.Txn) error) error {
// execute batch in transaction with limited operations per transaction
for start := 0; start < len(batch); start += etcdutil.MaxEtcdTxnOps {
end := start + etcdutil.MaxEtcdTxnOps
if end > len(batch) {
end = len(batch)
}
err := m.storage.RunInTxn(m.ctx, func(txn kv.Txn) (err error) {
for _, op := range batch[start:end] {
if err = op(txn); err != nil {
return err
}
}
return nil
})
if err != nil {
return err
}
}
return nil
}

// checkRule check the rule whether will have RuleFit after FitRegion
// in order to reduce the calculation.
func checkRule(rule *Rule, stores []*core.StoreInfo) bool {
Expand Down
15 changes: 8 additions & 7 deletions pkg/storage/endpoint/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,17 @@ type RuleStorage interface {
LoadRule(ruleKey string) (string, error)
LoadRules(f func(k, v string)) error
LoadRuleGroups(f func(k, v string)) error
LoadRegionRules(f func(k, v string)) error

// We need to use txn to avoid concurrent modification.
// And it is helpful for the scheduling server to watch the rule.
SaveRule(txn kv.Txn, ruleKey string, rule interface{}) error
DeleteRule(txn kv.Txn, ruleKey string) error
SaveRuleGroup(txn kv.Txn, groupID string, group interface{}) error
DeleteRuleGroup(txn kv.Txn, groupID string) error
SaveRegionRule(txn kv.Txn, ruleKey string, rule interface{}) error
DeleteRegionRule(txn kv.Txn, ruleKey string) error

LoadRegionRules(f func(k, v string)) error
SaveRegionRule(ruleKey string, rule interface{}) error
DeleteRegionRule(ruleKey string) error
RunInTxn(ctx context.Context, f func(txn kv.Txn) error) error
}

Expand Down Expand Up @@ -73,13 +74,13 @@ func (se *StorageEndpoint) LoadRegionRules(f func(k, v string)) error {
}

// SaveRegionRule saves a region rule to the storage.
func (se *StorageEndpoint) SaveRegionRule(ruleKey string, rule interface{}) error {
return se.saveJSON(regionLabelKeyPath(ruleKey), rule)
func (se *StorageEndpoint) SaveRegionRule(txn kv.Txn, ruleKey string, rule interface{}) error {
return saveJSONInTxn(txn, regionLabelKeyPath(ruleKey), rule)
}

// DeleteRegionRule removes a region rule from storage.
func (se *StorageEndpoint) DeleteRegionRule(ruleKey string) error {
return se.Remove(regionLabelKeyPath(ruleKey))
func (se *StorageEndpoint) DeleteRegionRule(txn kv.Txn, ruleKey string) error {
return txn.Remove(regionLabelKeyPath(ruleKey))
}

// LoadRule load a placement rule from storage.
Expand Down
30 changes: 30 additions & 0 deletions pkg/storage/endpoint/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
package endpoint

import (
"context"
"encoding/json"
"strings"

"github.com/gogo/protobuf/proto"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"go.etcd.io/etcd/clientv3"
)

Expand Down Expand Up @@ -74,3 +76,31 @@ func (se *StorageEndpoint) loadRangeByPrefix(prefix string, f func(k, v string))
nextKey = keys[len(keys)-1] + "\x00"
}
}

// TxnStorage is the interface with RunInTxn
type TxnStorage interface {
RunInTxn(ctx context.Context, f func(txn kv.Txn) error) error
}

// RunBatchOpInTxn runs a batch of operations in transaction.
// The batch is split into multiple transactions if it exceeds the maximum number of operations per transaction.
func RunBatchOpInTxn(ctx context.Context, storage TxnStorage, batch []func(kv.Txn) error) error {
for start := 0; start < len(batch); start += etcdutil.MaxEtcdTxnOps {
end := start + etcdutil.MaxEtcdTxnOps
if end > len(batch) {
end = len(batch)
}
err := storage.RunInTxn(ctx, func(txn kv.Txn) (err error) {
for _, op := range batch[start:end] {
if err = op(txn); err != nil {
return err
}
}
return nil
})
if err != nil {
return err
}
}
return nil
}
Loading