Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DDL: Remove TiFlash replica progress etcd dependency #38564

Merged
merged 23 commits into from
Oct 31, 2022
Merged
95 changes: 25 additions & 70 deletions ddl/ddl_tiflash_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/helper"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/gcutil"
"github.com/pingcap/tidb/util/logutil"
Expand Down Expand Up @@ -117,7 +116,6 @@ func NewPollTiFlashBackoffContext(MinThreshold, MaxThreshold TiFlashTick, Capaci
type TiFlashManagementContext struct {
TiFlashStores map[int64]helper.StoreStat
PollCounter uint64
ProgressCache map[int64]string
Backoff *PollTiFlashBackoffContext
// tables waiting for updating progress after become available.
UpdatingProgressTables *list.List
Expand Down Expand Up @@ -212,7 +210,6 @@ func NewTiFlashManagementContext() (*TiFlashManagementContext, error) {
return &TiFlashManagementContext{
PollCounter: 0,
TiFlashStores: make(map[int64]helper.StoreStat),
ProgressCache: make(map[int64]string),
Backoff: c,
UpdatingProgressTables: list.New(),
}, nil
Expand All @@ -235,8 +232,6 @@ var (
PollTiFlashBackoffRate TiFlashTick = 1.5
// RefreshProgressMaxTableCount is the max count of table to refresh progress after available each poll.
RefreshProgressMaxTableCount uint64 = 1000
// PollCleanProgressCacheInterval is the inteval (PollTiFlashInterval * PollCleanProgressCacheInterval) of cleaning progress cache to avoid data race when ddl owner switchover
PollCleanProgressCacheInterval uint64 = 300
)

func getTiflashHTTPAddr(host string, statusAddr string) (string, error) {
Expand Down Expand Up @@ -376,34 +371,6 @@ func getTiFlashPeerWithoutLagCount(pollTiFlashContext *TiFlashManagementContext,
return flashPeerCount, nil
}

// getTiFlashTableSyncProgress return truncated string to avoid float64 comparison.
func getTiFlashTableSyncProgress(pollTiFlashContext *TiFlashManagementContext, tableID int64, replicaCount uint64) (string, error) {
var regionCount int
if err := infosync.GetTiFlashRegionCountFromPD(context.Background(), tableID, &regionCount); err != nil {
logutil.BgLogger().Error("Fail to get regionCount from PD.",
zap.Int64("tableID", tableID))
return "0", errors.Trace(err)
}

tiflashPeerCount, err := getTiFlashPeerWithoutLagCount(pollTiFlashContext, tableID)
if err != nil {
logutil.BgLogger().Error("Fail to get peer count from TiFlash.",
zap.Int64("tableID", tableID))
return "0", errors.Trace(err)
}
progress := float64(tiflashPeerCount) / float64(regionCount*int(replicaCount))
if progress > 1 { // when pd do balance
logutil.BgLogger().Debug("TiFlash peer count > pd peer count, maybe doing balance.",
zap.Int64("tableID", tableID), zap.Int("tiflashPeerCount", tiflashPeerCount), zap.Int("regionCount", regionCount), zap.Uint64("replicaCount", replicaCount))
progress = 1
}
if progress < 1 {
logutil.BgLogger().Debug("TiFlash replica progress < 1.",
zap.Int64("tableID", tableID), zap.Int("tiflashPeerCount", tiflashPeerCount), zap.Int("regionCount", regionCount), zap.Uint64("replicaCount", replicaCount))
}
return types.TruncateFloatToString(progress, 2), nil
}

func pollAvailableTableProgress(schemas infoschema.InfoSchema, ctx sessionctx.Context, pollTiFlashContext *TiFlashManagementContext) {
pollMaxCount := RefreshProgressMaxTableCount
failpoint.Inject("PollAvailableTableProgressMaxCount", func(val failpoint.Value) {
Expand Down Expand Up @@ -445,7 +412,7 @@ func pollAvailableTableProgress(schemas infoschema.InfoSchema, ctx sessionctx.Co
element = element.Next()
continue
}
progress, err := getTiFlashTableSyncProgress(pollTiFlashContext, availableTableID.ID, tableInfo.TiFlashReplica.Count)
progress, err := infosync.CalculateTiFlashProgress(availableTableID.ID, tableInfo.TiFlashReplica.Count, pollTiFlashContext.TiFlashStores)
if err != nil {
logutil.BgLogger().Error("get tiflash sync progress failed",
zap.Error(err),
Expand All @@ -454,18 +421,15 @@ func pollAvailableTableProgress(schemas infoschema.InfoSchema, ctx sessionctx.Co
)
continue
}
if pollTiFlashContext.ProgressCache[availableTableID.ID] != progress {
err = infosync.UpdateTiFlashTableSyncProgress(context.Background(), availableTableID.ID, progress)
if err != nil {
logutil.BgLogger().Error("updating TiFlash replica process failed",
zap.Error(err),
zap.Int64("tableID or partitionID", availableTableID.ID),
zap.Bool("IsPartition", availableTableID.IsPartition),
zap.String("progress", progress),
)
continue
}
pollTiFlashContext.ProgressCache[availableTableID.ID] = progress
err = infosync.UpdateTiFlashProgressCache(availableTableID.ID, progress)
if err != nil {
logutil.BgLogger().Error("update tiflash sync progress cache failed",
zap.Error(err),
zap.Int64("tableID", availableTableID.ID),
zap.Bool("IsPartition", availableTableID.IsPartition),
zap.Float64("progress", progress),
)
continue
}
next := element.Next()
pollTiFlashContext.UpdatingProgressTables.Remove(element)
Expand All @@ -481,14 +445,6 @@ func (d *ddl) refreshTiFlashTicker(ctx sessionctx.Context, pollTiFlashContext *T
return err
}
}

failpoint.Inject("PollTiFlashReplicaStatusCleanProgressCache", func() {
pollTiFlashContext.PollCounter = PollCleanProgressCacheInterval
})
// 10min clean progress cache to avoid data race
if pollTiFlashContext.PollCounter > 0 && pollTiFlashContext.PollCounter%PollCleanProgressCacheInterval == 0 {
pollTiFlashContext.ProgressCache = make(map[int64]string)
}
pollTiFlashContext.PollCounter++

// Start to process every table.
Expand Down Expand Up @@ -530,37 +486,36 @@ func (d *ddl) refreshTiFlashTicker(ctx sessionctx.Context, pollTiFlashContext *T
continue
}

progress, err := getTiFlashTableSyncProgress(pollTiFlashContext, tb.ID, tb.Count)
progress, err := infosync.CalculateTiFlashProgress(tb.ID, tb.Count, pollTiFlashContext.TiFlashStores)
if err != nil {
logutil.BgLogger().Error("get tiflash sync progress failed",
zap.Error(err),
zap.Int64("tableID", tb.ID),
)
continue
}
if pollTiFlashContext.ProgressCache[tb.ID] != progress {
err = infosync.UpdateTiFlashTableSyncProgress(context.Background(), tb.ID, progress)
if err != nil {
logutil.BgLogger().Error("updating TiFlash replica process failed",
zap.Error(err),
zap.Int64("tableID", tb.ID),
zap.String("progress", progress),
)
continue
}
pollTiFlashContext.ProgressCache[tb.ID] = progress

err = infosync.UpdateTiFlashProgressCache(tb.ID, progress)
if err != nil {
logutil.BgLogger().Error("get tiflash sync progress from cache failed",
zap.Error(err),
zap.Int64("tableID", tb.ID),
zap.Bool("IsPartition", tb.IsPartition),
zap.Float64("progress", progress),
)
continue
}

avail := progress[0] == '1'
avail := progress == 1
failpoint.Inject("PollTiFlashReplicaStatusReplaceCurAvailableValue", func(val failpoint.Value) {
avail = val.(bool)
})

if !avail {
logutil.BgLogger().Info("Tiflash replica is not available", zap.Int64("tableID", tb.ID), zap.String("progress", progress))
logutil.BgLogger().Info("Tiflash replica is not available", zap.Int64("tableID", tb.ID), zap.Float64("progress", progress))
pollTiFlashContext.Backoff.Put(tb.ID)
} else {
logutil.BgLogger().Info("Tiflash replica is available", zap.Int64("tableID", tb.ID), zap.String("progress", progress))
logutil.BgLogger().Info("Tiflash replica is available", zap.Int64("tableID", tb.ID), zap.Float64("progress", progress))
pollTiFlashContext.Backoff.Remove(tb.ID)
}
failpoint.Inject("skipUpdateTableReplicaInfoInLoop", func() {
Expand Down Expand Up @@ -736,7 +691,7 @@ func (d *ddl) PollTiFlashRoutine() {
}
}
} else {
pollTiflashContext.ProgressCache = make(map[int64]string)
infosync.CleanTiFlashProgressCache()
}
d.sessPool.put(sctx)
} else {
Expand Down
Loading