Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#40872
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
hehechen authored and ti-chi-bot committed Jan 31, 2023
1 parent a046a2d commit 7a601c5
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 0 deletions.
12 changes: 12 additions & 0 deletions ddl/ddl_tiflash_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,18 @@ func (d *ddl) pollTiFlashReplicaStatus(ctx sessionctx.Context, pollTiFlashContex
if err := d.UpdateTiFlashHTTPAddress(&s); err != nil {
}
}
<<<<<<< HEAD
=======

failpoint.Inject("OneTiFlashStoreDown", func() {
for storeID, store := range pollTiFlashContext.TiFlashStores {
store.Store.StateName = "Down"
pollTiFlashContext.TiFlashStores[storeID] = store
break
}
})
pollTiFlashContext.PollCounter++
>>>>>>> c8bffd42c2 (DDL: Skip collecting TiFlash status when TiFlash is down (#40872))

// Start to process every table.
schema := d.GetInfoSchemaWithInterceptor(ctx)
Expand Down
20 changes: 20 additions & 0 deletions ddl/ddl_tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1049,3 +1049,23 @@ func TestTiFlashAvailableAfterAddPartition(t *testing.T) {
require.NotNil(t, pi)
require.Equal(t, len(pi.Definitions), 2)
}

func TestTiFlashAvailableAfterDownOneStore(t *testing.T) {
s, teardown := createTiFlashContext(t)
defer teardown()
tk := testkit.NewTestKit(t, s.store)

tk.MustExec("use test")
tk.MustExec("drop table if exists ddltiflash")
tk.MustExec("create table ddltiflash(z int) PARTITION BY RANGE(z) (PARTITION p0 VALUES LESS THAN (10))")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/OneTiFlashStoreDown", `return`))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/infosync/OneTiFlashStoreDown", `return`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/OneTiFlashStoreDown"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/domain/infosync/OneTiFlashStoreDown"))
}()

tk.MustExec("alter table ddltiflash set tiflash replica 1")
time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable * 3)
CheckTableAvailable(s.dom, t, 1, []string{})
}
97 changes: 97 additions & 0 deletions domain/infosync/tiflash_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"github.com/gorilla/mux"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/store/helper"
"github.com/pingcap/tidb/tablecodec"
Expand Down Expand Up @@ -71,6 +72,102 @@ func (m *TiFlashPDPlacementManager) Close(ctx context.Context) {

}

<<<<<<< HEAD
=======
func getTiFlashPeerWithoutLagCount(tiFlashStores map[int64]helper.StoreStat, tableID int64) (int, error) {
// storeIDs -> regionID, PD will not create two peer on the same store
var flashPeerCount int
for _, store := range tiFlashStores {
regionReplica := make(map[int64]int)
err := helper.CollectTiFlashStatus(store.Store.StatusAddress, tableID, &regionReplica)
failpoint.Inject("OneTiFlashStoreDown", func() {
if store.Store.StateName == "Down" {
err = errors.New("mock TiFlasah down")
}
})
if err != nil {
logutil.BgLogger().Error("Fail to get peer status from TiFlash.",
zap.Int64("tableID", tableID))
// Just skip down or offline or tomestone stores, because PD will migrate regions from these stores.
if store.Store.StateName == "Up" || store.Store.StateName == "Disconnected" {
return 0, err
}
continue
}
flashPeerCount += len(regionReplica)
}
return flashPeerCount, nil
}

// calculateTiFlashProgress calculates progress based on the region status from PD and TiFlash.
func calculateTiFlashProgress(tableID int64, replicaCount uint64, tiFlashStores map[int64]helper.StoreStat) (float64, error) {
var regionCount int
if err := GetTiFlashRegionCountFromPD(context.Background(), tableID, &regionCount); err != nil {
logutil.BgLogger().Error("Fail to get regionCount from PD.",
zap.Int64("tableID", tableID))
return 0, errors.Trace(err)
}

if regionCount == 0 {
logutil.BgLogger().Warn("region count getting from PD is 0.",
zap.Int64("tableID", tableID))
return 0, fmt.Errorf("region count getting from PD is 0")
}

tiflashPeerCount, err := getTiFlashPeerWithoutLagCount(tiFlashStores, tableID)
if err != nil {
logutil.BgLogger().Error("Fail to get peer count from TiFlash.",
zap.Int64("tableID", tableID))
return 0, errors.Trace(err)
}
progress := float64(tiflashPeerCount) / float64(regionCount*int(replicaCount))
if progress > 1 { // when pd do balance
logutil.BgLogger().Debug("TiFlash peer count > pd peer count, maybe doing balance.",
zap.Int64("tableID", tableID), zap.Int("tiflashPeerCount", tiflashPeerCount), zap.Int("regionCount", regionCount), zap.Uint64("replicaCount", replicaCount))
progress = 1
}
if progress < 1 {
logutil.BgLogger().Debug("TiFlash replica progress < 1.",
zap.Int64("tableID", tableID), zap.Int("tiflashPeerCount", tiflashPeerCount), zap.Int("regionCount", regionCount), zap.Uint64("replicaCount", replicaCount))
}
return progress, nil
}

// CalculateTiFlashProgress calculates TiFlash replica progress.
func (m *TiFlashReplicaManagerCtx) CalculateTiFlashProgress(tableID int64, replicaCount uint64, tiFlashStores map[int64]helper.StoreStat) (float64, error) {
return calculateTiFlashProgress(tableID, replicaCount, tiFlashStores)
}

// UpdateTiFlashProgressCache updates tiflashProgressCache
func (m *TiFlashReplicaManagerCtx) UpdateTiFlashProgressCache(tableID int64, progress float64) {
m.Lock()
defer m.Unlock()
m.tiflashProgressCache[tableID] = progress
}

// GetTiFlashProgressFromCache gets tiflash replica progress from tiflashProgressCache
func (m *TiFlashReplicaManagerCtx) GetTiFlashProgressFromCache(tableID int64) (float64, bool) {
m.RLock()
defer m.RUnlock()
progress, ok := m.tiflashProgressCache[tableID]
return progress, ok
}

// DeleteTiFlashProgressFromCache delete tiflash replica progress from tiflashProgressCache
func (m *TiFlashReplicaManagerCtx) DeleteTiFlashProgressFromCache(tableID int64) {
m.Lock()
defer m.Unlock()
delete(m.tiflashProgressCache, tableID)
}

// CleanTiFlashProgressCache clean progress cache
func (m *TiFlashReplicaManagerCtx) CleanTiFlashProgressCache() {
m.Lock()
defer m.Unlock()
m.tiflashProgressCache = make(map[int64]float64)
}

>>>>>>> c8bffd42c2 (DDL: Skip collecting TiFlash status when TiFlash is down (#40872))
// SetTiFlashGroupConfig sets the tiflash's rule group config
func (m *TiFlashPDPlacementManager) SetTiFlashGroupConfig(ctx context.Context) error {
res, err := doRequest(ctx,
Expand Down

0 comments on commit 7a601c5

Please sign in to comment.