From eebe8cc369d97807036faf73cc788c170e486fda Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Thu, 18 Jan 2024 23:54:35 +0800 Subject: [PATCH 1/4] mcs, schedule: use txn in label rule manager Signed-off-by: lhy1024 --- pkg/schedule/labeler/labeler.go | 45 +++++++++++++++----------- pkg/schedule/placement/rule_manager.go | 27 ++-------------- pkg/storage/endpoint/rule.go | 15 +++++---- pkg/storage/endpoint/util.go | 30 +++++++++++++++++ 4 files changed, 67 insertions(+), 50 deletions(-) diff --git a/pkg/schedule/labeler/labeler.go b/pkg/schedule/labeler/labeler.go index 7254460f577..6fad38c9cc3 100644 --- a/pkg/schedule/labeler/labeler.go +++ b/pkg/schedule/labeler/labeler.go @@ -24,6 +24,7 @@ import ( "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/schedule/rangelist" "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/syncutil" "go.uber.org/zap" @@ -88,11 +89,10 @@ func (l *RegionLabeler) checkAndClearExpiredLabels() { continue } if len(rule.Labels) == 0 { - err = l.storage.DeleteRegionRule(key) - delete(l.labelRules, key) + err = l.DeleteLabelRuleLocked(key) deleted = true } else { - err = l.storage.SaveRegionRule(key, rule) + err = l.SetLabelRuleLocked(rule) } if err != nil { log.Error("failed to save rule expired label rule", zap.String("rule-key", key), zap.Error(err)) @@ -124,7 +124,7 @@ func (l *RegionLabeler) loadRules() error { return err } for _, d := range toDelete { - if err = l.storage.DeleteRegionRule(d); err != nil { + if err = l.DeleteLabelRuleLocked(d); err != nil { return err } } @@ -197,11 +197,10 @@ func (l *RegionLabeler) getAndCheckRule(id string, now time.Time) *LabelRule { return rule } if len(rule.Labels) == 0 { - l.storage.DeleteRegionRule(id) - delete(l.labelRules, id) + l.DeleteLabelRuleLocked(id) return nil } - l.storage.SaveRegionRule(id, rule) + l.SetLabelRuleLocked(rule) return rule } @@ -221,7 +220,9 @@ func (l *RegionLabeler) SetLabelRuleLocked(rule *LabelRule) error { if err := rule.checkAndAdjust(); err != nil { return err } - if err := l.storage.SaveRegionRule(rule.ID, rule); err != nil { + if err := l.storage.RunInTxn(l.ctx, func(txn kv.Txn) error { + return l.storage.SaveRegionRule(txn, rule.ID, rule) + }); err != nil { return err } l.labelRules[rule.ID] = rule @@ -232,6 +233,9 @@ func (l *RegionLabeler) SetLabelRuleLocked(rule *LabelRule) error { func (l *RegionLabeler) DeleteLabelRule(id string) error { l.Lock() defer l.Unlock() + if _, ok := l.labelRules[id]; !ok { + return errs.ErrRegionRuleNotFound.FastGenByArgs(id) + } if err := l.DeleteLabelRuleLocked(id); err != nil { return err } @@ -241,10 +245,9 @@ func (l *RegionLabeler) DeleteLabelRule(id string) error { // DeleteLabelRuleLocked removes a LabelRule but not buildRangeList. func (l *RegionLabeler) DeleteLabelRuleLocked(id string) error { - if _, ok := l.labelRules[id]; !ok { - return errs.ErrRegionRuleNotFound.FastGenByArgs(id) - } - if err := l.storage.DeleteRegionRule(id); err != nil { + if err := l.storage.RunInTxn(l.ctx, func(txn kv.Txn) error { + return l.storage.DeleteRegionRule(txn, id) + }); err != nil { return err } delete(l.labelRules, id) @@ -260,15 +263,21 @@ func (l *RegionLabeler) Patch(patch LabelRulePatch) error { } // save to storage + var batch []func(kv.Txn) error for _, key := range patch.DeleteRules { - if err := l.storage.DeleteRegionRule(key); err != nil { - return err - } + localKey := key + batch = append(batch, func(txn kv.Txn) error { + return l.storage.DeleteRegionRule(txn, localKey) + }) } for _, rule := range patch.SetRules { - if err := l.storage.SaveRegionRule(rule.ID, rule); err != nil { - return err - } + localID, localRule := rule.ID, rule + batch = append(batch, func(txn kv.Txn) error { + return l.storage.SaveRegionRule(txn, localID, localRule) + }) + } + if err := endpoint.RunBatchOpInTxn(l.ctx, l.storage, batch); err != nil { + return err } // update in-memory states. diff --git a/pkg/schedule/placement/rule_manager.go b/pkg/schedule/placement/rule_manager.go index e0dc6056a89..f44258d797c 100644 --- a/pkg/schedule/placement/rule_manager.go +++ b/pkg/schedule/placement/rule_manager.go @@ -34,7 +34,6 @@ import ( "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/storage/kv" - "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/syncutil" "go.uber.org/zap" "golang.org/x/exp/slices" @@ -202,7 +201,7 @@ func (m *RuleManager) loadRules() error { return m.storage.DeleteRule(txn, localKey) }) } - return m.runBatchOpInTxn(batch) + return endpoint.RunBatchOpInTxn(m.ctx, m.storage, batch) } func (m *RuleManager) loadGroups() error { @@ -521,7 +520,7 @@ func (m *RuleManager) savePatch(p *ruleConfig) error { }) } } - return m.runBatchOpInTxn(batch) + return endpoint.RunBatchOpInTxn(m.ctx, m.storage, batch) } // SetRules inserts or updates lots of Rules at once. @@ -814,28 +813,6 @@ func (m *RuleManager) IsInitialized() bool { return m.initialized } -func (m *RuleManager) runBatchOpInTxn(batch []func(kv.Txn) error) error { - // execute batch in transaction with limited operations per transaction - for start := 0; start < len(batch); start += etcdutil.MaxEtcdTxnOps { - end := start + etcdutil.MaxEtcdTxnOps - if end > len(batch) { - end = len(batch) - } - err := m.storage.RunInTxn(m.ctx, func(txn kv.Txn) (err error) { - for _, op := range batch[start:end] { - if err = op(txn); err != nil { - return err - } - } - return nil - }) - if err != nil { - return err - } - } - return nil -} - // checkRule check the rule whether will have RuleFit after FitRegion // in order to reduce the calculation. func checkRule(rule *Rule, stores []*core.StoreInfo) bool { diff --git a/pkg/storage/endpoint/rule.go b/pkg/storage/endpoint/rule.go index ad245f527bb..b0827fda477 100644 --- a/pkg/storage/endpoint/rule.go +++ b/pkg/storage/endpoint/rule.go @@ -27,16 +27,17 @@ type RuleStorage interface { LoadRule(ruleKey string) (string, error) LoadRules(f func(k, v string)) error LoadRuleGroups(f func(k, v string)) error + LoadRegionRules(f func(k, v string)) error + // We need to use txn to avoid concurrent modification. // And it is helpful for the scheduling server to watch the rule. SaveRule(txn kv.Txn, ruleKey string, rule interface{}) error DeleteRule(txn kv.Txn, ruleKey string) error SaveRuleGroup(txn kv.Txn, groupID string, group interface{}) error DeleteRuleGroup(txn kv.Txn, groupID string) error + SaveRegionRule(txn kv.Txn, ruleKey string, rule interface{}) error + DeleteRegionRule(txn kv.Txn, ruleKey string) error - LoadRegionRules(f func(k, v string)) error - SaveRegionRule(ruleKey string, rule interface{}) error - DeleteRegionRule(ruleKey string) error RunInTxn(ctx context.Context, f func(txn kv.Txn) error) error } @@ -73,13 +74,13 @@ func (se *StorageEndpoint) LoadRegionRules(f func(k, v string)) error { } // SaveRegionRule saves a region rule to the storage. -func (se *StorageEndpoint) SaveRegionRule(ruleKey string, rule interface{}) error { - return se.saveJSON(regionLabelKeyPath(ruleKey), rule) +func (se *StorageEndpoint) SaveRegionRule(txn kv.Txn, ruleKey string, rule interface{}) error { + return saveJSONInTxn(txn, regionLabelKeyPath(ruleKey), rule) } // DeleteRegionRule removes a region rule from storage. -func (se *StorageEndpoint) DeleteRegionRule(ruleKey string) error { - return se.Remove(regionLabelKeyPath(ruleKey)) +func (se *StorageEndpoint) DeleteRegionRule(txn kv.Txn, ruleKey string) error { + return txn.Remove(regionLabelKeyPath(ruleKey)) } // LoadRule load a placement rule from storage. diff --git a/pkg/storage/endpoint/util.go b/pkg/storage/endpoint/util.go index 62b170a1a8e..cf1e4ef2315 100644 --- a/pkg/storage/endpoint/util.go +++ b/pkg/storage/endpoint/util.go @@ -15,12 +15,14 @@ package endpoint import ( + "context" "encoding/json" "strings" "github.com/gogo/protobuf/proto" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/storage/kv" + "github.com/tikv/pd/pkg/utils/etcdutil" "go.etcd.io/etcd/clientv3" ) @@ -74,3 +76,31 @@ func (se *StorageEndpoint) loadRangeByPrefix(prefix string, f func(k, v string)) nextKey = keys[len(keys)-1] + "\x00" } } + +// TxnStorage is the interface with RunInTxn +type TxnStorage interface { + RunInTxn(ctx context.Context, f func(txn kv.Txn) error) error +} + +// RunBatchOpInTxn runs a batch of operations in transaction. +// The batch is split into multiple transactions if it exceeds the maximum number of operations per transaction. +func RunBatchOpInTxn(ctx context.Context, storage TxnStorage, batch []func(kv.Txn) error) error { + for start := 0; start < len(batch); start += etcdutil.MaxEtcdTxnOps { + end := start + etcdutil.MaxEtcdTxnOps + if end > len(batch) { + end = len(batch) + } + err := storage.RunInTxn(ctx, func(txn kv.Txn) (err error) { + for _, op := range batch[start:end] { + if err = op(txn); err != nil { + return err + } + } + return nil + }) + if err != nil { + return err + } + } + return nil +} From b38f3f1ac71edeb8137ecd89570425499ffe3121 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Fri, 19 Jan 2024 15:55:18 +0800 Subject: [PATCH 2/4] address comments Signed-off-by: lhy1024 --- pkg/schedule/labeler/labeler.go | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/pkg/schedule/labeler/labeler.go b/pkg/schedule/labeler/labeler.go index 6fad38c9cc3..8e5a4a3268a 100644 --- a/pkg/schedule/labeler/labeler.go +++ b/pkg/schedule/labeler/labeler.go @@ -90,9 +90,11 @@ func (l *RegionLabeler) checkAndClearExpiredLabels() { } if len(rule.Labels) == 0 { err = l.DeleteLabelRuleLocked(key) - deleted = true + if err == nil { + deleted = true + } } else { - err = l.SetLabelRuleLocked(rule) + err = l.SaveLabelRuleLocked(rule) } if err != nil { log.Error("failed to save rule expired label rule", zap.String("rule-key", key), zap.Error(err)) @@ -200,7 +202,7 @@ func (l *RegionLabeler) getAndCheckRule(id string, now time.Time) *LabelRule { l.DeleteLabelRuleLocked(id) return nil } - l.SetLabelRuleLocked(rule) + l.SaveLabelRuleLocked(rule) return rule } @@ -220,15 +222,21 @@ func (l *RegionLabeler) SetLabelRuleLocked(rule *LabelRule) error { if err := rule.checkAndAdjust(); err != nil { return err } - if err := l.storage.RunInTxn(l.ctx, func(txn kv.Txn) error { - return l.storage.SaveRegionRule(txn, rule.ID, rule) - }); err != nil { + if err := l.SaveLabelRuleLocked(rule); err != nil { return err } l.labelRules[rule.ID] = rule return nil } +// SaveLabelRuleLocked inserts or updates a LabelRule but not buildRangeList. +// It only saves the rule to storage, and does not update the in-memory states. +func (l *RegionLabeler) SaveLabelRuleLocked(rule *LabelRule) error { + return l.storage.RunInTxn(l.ctx, func(txn kv.Txn) error { + return l.storage.SaveRegionRule(txn, rule.ID, rule) + }) +} + // DeleteLabelRule removes a LabelRule. func (l *RegionLabeler) DeleteLabelRule(id string) error { l.Lock() From 83030100c676a0d4b3df3a0e4c7257961e76a1b2 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Fri, 19 Jan 2024 16:32:51 +0800 Subject: [PATCH 3/4] add unit test Signed-off-by: lhy1024 --- pkg/schedule/labeler/labeler_test.go | 44 ++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/pkg/schedule/labeler/labeler_test.go b/pkg/schedule/labeler/labeler_test.go index f38c6321c01..e5b8e4e7694 100644 --- a/pkg/schedule/labeler/labeler_test.go +++ b/pkg/schedule/labeler/labeler_test.go @@ -20,6 +20,7 @@ import ( "encoding/json" "fmt" "sort" + "strconv" "testing" "time" @@ -132,6 +133,49 @@ func TestGetSetRule(t *testing.T) { for id, rule := range allRules { expectSameRules(re, rule, rules[id+1]) } + + for _, r := range rules { + labeler.DeleteLabelRule(r.ID) + } + re.Empty(labeler.GetAllLabelRules()) + + // test patch rules in batch + rulesNum := 200 + patch.SetRules = patch.SetRules[:0] + patch.DeleteRules = patch.DeleteRules[:0] + for i := 1; i <= rulesNum; i++ { + patch.SetRules = append(patch.SetRules, &LabelRule{ + ID: fmt.Sprintf("rule_%d", i), + Labels: []RegionLabel{ + {Key: fmt.Sprintf("k_%d", i), Value: fmt.Sprintf("v_%d", i)}, + }, + RuleType: "key-range", + Data: MakeKeyRanges("", ""), + }) + } + err = labeler.Patch(patch) + re.NoError(err) + allRules = labeler.GetAllLabelRules() + re.Len(allRules, rulesNum) + sort.Slice(allRules, func(i, j int) bool { + i1, err := strconv.Atoi(allRules[i].ID[5:]) + re.NoError(err) + j1, err := strconv.Atoi(allRules[j].ID[5:]) + re.NoError(err) + return i1 < j1 + }) + for id, rule := range allRules { + expectSameRules(re, rule, patch.SetRules[id]) + } + patch.SetRules = patch.SetRules[:0] + patch.DeleteRules = patch.DeleteRules[:0] + for i := 1; i <= rulesNum; i++ { + patch.DeleteRules = append(patch.DeleteRules, fmt.Sprintf("rule_%d", i)) + } + err = labeler.Patch(patch) + re.NoError(err) + allRules = labeler.GetAllLabelRules() + re.Empty(allRules) } func TestIndex(t *testing.T) { From 4b810e11fc2d479abd12509df6cf1fe54a2677f9 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Fri, 19 Jan 2024 16:54:32 +0800 Subject: [PATCH 4/4] address comments Signed-off-by: lhy1024 --- pkg/schedule/labeler/labeler.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/schedule/labeler/labeler.go b/pkg/schedule/labeler/labeler.go index 8e5a4a3268a..b33324b8a41 100644 --- a/pkg/schedule/labeler/labeler.go +++ b/pkg/schedule/labeler/labeler.go @@ -125,8 +125,10 @@ func (l *RegionLabeler) loadRules() error { if err != nil { return err } - for _, d := range toDelete { - if err = l.DeleteLabelRuleLocked(d); err != nil { + for _, id := range toDelete { + if err := l.storage.RunInTxn(l.ctx, func(txn kv.Txn) error { + return l.storage.DeleteRegionRule(txn, id) + }); err != nil { return err } }