Skip to content

Commit

Permalink
replication_mode: use placement to determin canSync and hasMajority (#…
Browse files Browse the repository at this point in the history
…7202) (#7209) (#7285)

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
disksing and ti-chi-bot authored Oct 30, 2023
1 parent c5b59c8 commit 4ba0fa6
Show file tree
Hide file tree
Showing 2 changed files with 289 additions and 59 deletions.
113 changes: 75 additions & 38 deletions server/replication/replication_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/schedule"
"github.com/tikv/pd/server/schedule/placement"
"github.com/tikv/pd/server/storage/endpoint"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -363,37 +364,72 @@ func (m *ModeManager) Run(ctx context.Context) {
wg.Wait()
}

func minimalUpVoters(rule *placement.Rule, upStores, downStores []*core.StoreInfo) int {
if rule.Role == placement.Learner {
return 0
}
var up, down int
for _, s := range upStores {
if placement.MatchLabelConstraints(s, rule.LabelConstraints) {
up++
}
}
for _, s := range downStores {
if placement.MatchLabelConstraints(s, rule.LabelConstraints) {
down++
}
}
minimalUp := rule.Count - down
if minimalUp < 0 {
minimalUp = 0
}
if minimalUp > up {
minimalUp = up
}
return minimalUp
}

func (m *ModeManager) tickUpdateState() {
if m.getModeName() != modeDRAutoSync {
return
}

drTickCounter.Inc()

totalPrimaryPeers, totalDrPeers := m.config.DRAutoSync.PrimaryReplicas, m.config.DRAutoSync.DRReplicas
stores := m.checkStoreStatus()
stores, storeIDs := m.checkStoreStatus()

// canSync is true when every region has at least 1 replica in each DC.
canSync := len(stores[primaryDown]) < totalPrimaryPeers && len(stores[drDown]) < totalDrPeers &&
len(stores[primaryUp]) > 0 && len(stores[drUp]) > 0
var primaryHasVoter, drHasVoter bool
var totalVoter, totalUpVoter int
for _, r := range m.cluster.GetRuleManager().GetAllRules() {
if len(r.StartKey) > 0 || len(r.EndKey) > 0 {
// All rules should be global rules. If not, skip it.
continue
}
if r.Role != placement.Learner {
totalVoter += r.Count
}
minimalUpPrimary := minimalUpVoters(r, stores[primaryUp], stores[primaryDown])
minimalUpDr := minimalUpVoters(r, stores[drUp], stores[drDown])
primaryHasVoter = primaryHasVoter || minimalUpPrimary > 0
drHasVoter = drHasVoter || minimalUpDr > 0
upVoters := minimalUpPrimary + minimalUpDr
if upVoters > r.Count {
upVoters = r.Count
}
totalUpVoter += upVoters
}

// canSync is true when every region has at least 1 voter replica in each DC.
// hasMajority is true when every region has majority peer online.
var upPeers int
if len(stores[primaryDown]) < totalPrimaryPeers {
upPeers += totalPrimaryPeers - len(stores[primaryDown])
}
if len(stores[drDown]) < totalDrPeers {
upPeers += totalDrPeers - len(stores[drDown])
}
hasMajority := upPeers*2 > totalPrimaryPeers+totalDrPeers
canSync := primaryHasVoter && drHasVoter
hasMajority := totalUpVoter*2 > totalVoter

log.Debug("replication store status",
zap.Uint64s("up-primary", stores[primaryUp]),
zap.Uint64s("up-dr", stores[drUp]),
zap.Uint64s("down-primary", stores[primaryDown]),
zap.Uint64s("down-dr", stores[drDown]),
zap.Uint64s("up-primary", storeIDs[primaryUp]),
zap.Uint64s("up-dr", storeIDs[drUp]),
zap.Uint64s("down-primary", storeIDs[primaryDown]),
zap.Uint64s("down-dr", storeIDs[drDown]),
zap.Bool("can-sync", canSync),
zap.Int("up-peers", upPeers),
zap.Bool("has-majority", hasMajority),
)

Expand All @@ -419,31 +455,31 @@ func (m *ModeManager) tickUpdateState() {
case drStateSync:
// If hasMajority is false, the cluster is always unavailable. Switch to async won't help.
if !canSync && hasMajority {
m.drSwitchToAsyncWait(stores[primaryUp])
m.drSwitchToAsyncWait(storeIDs[primaryUp])
}
case drStateAsyncWait:
if canSync {
m.drSwitchToSync()
break
}
if oldAvailableStores := m.drGetAvailableStores(); !reflect.DeepEqual(oldAvailableStores, stores[primaryUp]) {
m.drSwitchToAsyncWait(stores[primaryUp])
if oldAvailableStores := m.drGetAvailableStores(); !reflect.DeepEqual(oldAvailableStores, storeIDs[primaryUp]) {
m.drSwitchToAsyncWait(storeIDs[primaryUp])
break
}
if m.drCheckStoreStateUpdated(stores[primaryUp]) {
m.drSwitchToAsync(stores[primaryUp])
if m.drCheckStoreStateUpdated(storeIDs[primaryUp]) {
m.drSwitchToAsync(storeIDs[primaryUp])
}
case drStateAsync:
if canSync {
m.drSwitchToSyncRecover()
break
}
if !reflect.DeepEqual(m.drGetAvailableStores(), stores[primaryUp]) && m.drCheckStoreStateUpdated(stores[primaryUp]) {
m.drSwitchToAsync(stores[primaryUp])
if !reflect.DeepEqual(m.drGetAvailableStores(), storeIDs[primaryUp]) && m.drCheckStoreStateUpdated(storeIDs[primaryUp]) {
m.drSwitchToAsync(storeIDs[primaryUp])
}
case drStateSyncRecover:
if !canSync && hasMajority {
m.drSwitchToAsync(stores[primaryUp])
m.drSwitchToAsync(storeIDs[primaryUp])
} else {
m.updateProgress()
progress := m.estimateProgress()
Expand Down Expand Up @@ -518,39 +554,40 @@ const (
storeStatusTypeCount
)

func (m *ModeManager) checkStoreStatus() [][]uint64 {
func (m *ModeManager) checkStoreStatus() ([][]*core.StoreInfo, [][]uint64) {
m.RLock()
defer m.RUnlock()
stores := make([][]uint64, storeStatusTypeCount)
stores, storeIDs := make([][]*core.StoreInfo, storeStatusTypeCount), make([][]uint64, storeStatusTypeCount)
for _, s := range m.cluster.GetStores() {
if s.IsRemoved() {
continue
}
// learner peers do not participate in major commit or vote, so it should not count in primary/dr as a normal store.
if s.GetRegionCount() == s.GetLearnerCount() {
continue
}
down := s.DownTime() >= m.config.DRAutoSync.WaitStoreTimeout.Duration
labelValue := s.GetLabelValue(m.config.DRAutoSync.LabelKey)
if labelValue == m.config.DRAutoSync.Primary {
if down {
stores[primaryDown] = append(stores[primaryDown], s.GetID())
stores[primaryDown] = append(stores[primaryDown], s)
storeIDs[primaryDown] = append(storeIDs[primaryDown], s.GetID())
} else {
stores[primaryUp] = append(stores[primaryUp], s.GetID())
stores[primaryUp] = append(stores[primaryUp], s)
storeIDs[primaryUp] = append(storeIDs[primaryUp], s.GetID())
}
}
if labelValue == m.config.DRAutoSync.DR {
if down {
stores[drDown] = append(stores[drDown], s.GetID())
stores[drDown] = append(stores[drDown], s)
storeIDs[drDown] = append(storeIDs[drDown], s.GetID())
} else {
stores[drUp] = append(stores[drUp], s.GetID())
stores[drUp] = append(stores[drUp], s)
storeIDs[drUp] = append(storeIDs[drUp], s.GetID())
}
}
}
for i := range stores {
sort.Slice(stores[i], func(a, b int) bool { return stores[i][a] < stores[i][b] })
sort.Slice(stores[i], func(a, b int) bool { return stores[i][a].GetID() < stores[i][b].GetID() })
sort.Slice(storeIDs[i], func(a, b int) bool { return storeIDs[i][a] < storeIDs[i][b] })
}
return stores
return stores, storeIDs
}

// UpdateStoreDRStatus saves the dr-autosync status of a store.
Expand Down
Loading

0 comments on commit 4ba0fa6

Please sign in to comment.