Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

domain: disable closest-adaptive dynamically to make read traffic more even #38960

Merged
merged 7 commits into from
Nov 22, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 62 additions & 22 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ package domain
import (
"context"
"fmt"
"math"
"math/rand"
"sort"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -1105,8 +1107,12 @@ func (do *Domain) closestReplicaReadCheckLoop(ctx context.Context, pdClient pd.C
}
}

// Periodically check and update the replica-read status when `tidb_replica_read` is set to "closest-adaptive"
// We disable "closest-adaptive" in following conditions to ensure the read traffic is evenly distributed across
// all AZs:
// - There are no TiKV servers in the AZ of this tidb instance
// - The AZ if this tidb contains more tidb than other AZ and this tidb's id is the bigger one.
func (do *Domain) checkReplicaRead(ctx context.Context, pdClient pd.Client) error {
// fast path
do.sysVarCache.RLock()
replicaRead := do.sysVarCache.global[variable.TiDBReplicaRead]
do.sysVarCache.RUnlock()
Expand All @@ -1115,6 +1121,24 @@ func (do *Domain) checkReplicaRead(ctx context.Context, pdClient pd.Client) erro
logutil.BgLogger().Debug("closest replica read is not enabled, skip check!", zap.String("mode", replicaRead))
return nil
}

serverInfo, err := infosync.GetServerInfo()
if err != nil {
return err
}
zone := ""
for k, v := range serverInfo.Labels {
if k == placement.DCLabelKey && v != "" {
zone = v
break
}
}
if zone == "" {
logutil.BgLogger().Debug("server contains no 'zone' label, disable closest replica read", zap.Any("labels", serverInfo.Labels))
variable.SetEnableAdaptiveReplicaRead(false)
return nil
}

stores, err := pdClient.GetAllStores(ctx, pd.WithExcludeTombstone())
if err != nil {
return err
Expand All @@ -1134,32 +1158,48 @@ func (do *Domain) checkReplicaRead(ctx context.Context, pdClient pd.Client) erro
}
}

enabled := false
// if stores don't have zone labels or are distribued in 1 zone, just disable cloeset replica read.
if len(storeZones) > 1 {
enabled = true
servers, err := infosync.GetAllServerInfo(ctx)
if err != nil {
return err
}
for _, s := range servers {
if v, ok := s.Labels[placement.DCLabelKey]; ok && v != "" {
if _, ok := storeZones[v]; !ok {
enabled = false
break
}
// no stores in this AZ
if _, ok := storeZones[zone]; !ok {
variable.SetEnableAdaptiveReplicaRead(false)
return nil
}

servers, err := infosync.GetAllServerInfo(ctx)
if err != nil {
return err
}
svrIdsInThisZone := make([]string, 0)
for _, s := range servers {
if v, ok := s.Labels[placement.DCLabelKey]; ok && v != "" {
if _, ok := storeZones[v]; ok {
storeZones[v] += 1
}
}
if enabled {
for _, count := range storeZones {
if count == 0 {
enabled = false
break
if v == zone {
svrIdsInThisZone = append(svrIdsInThisZone, s.ID)
}
}
}
}
enabledCount := math.MaxInt
for _, count := range storeZones {
if count < enabledCount {
enabledCount = count
}
}
// sort tidb in the same AZ by ID and disable the tidb with bigger ID
// because ID is unchangeable, so this is a simple and stable algorithm to select
// some instances across all tidb servers.
if enabledCount < len(svrIdsInThisZone) {
sort.Slice(svrIdsInThisZone, func(i, j int) bool {
return strings.Compare(svrIdsInThisZone[i], svrIdsInThisZone[j]) < 0
})
}
xhebox marked this conversation as resolved.
Show resolved Hide resolved
enabled := true
for _, s := range svrIdsInThisZone[enabledCount:] {
if s == serverInfo.ID {
enabled = false
break
}
}

if variable.SetEnableAdaptiveReplicaRead(enabled) {
logutil.BgLogger().Info("tidb server adaptive closest replica read is changed", zap.Bool("enable", enabled))
Expand Down
86 changes: 86 additions & 0 deletions domain/domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,24 @@ func TestClosestReplicaReadChecker(t *testing.T) {
}
dom.sysVarCache.Unlock()

mockedAllServerInfos := map[string]*infosync.ServerInfo{
"s1": {
ID: "s1",
Labels: map[string]string{
"zone": "zone1",
},
},
"s2": {
ID: "s2",
Labels: map[string]string{
"zone": "zone2",
},
},
}
infosync.SetAllServerInfo4Test(mockedAllServerInfos)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/infosync/mockGetAllServerInfo", `return("")`))
infosync.SetServerInfo4Test(mockedAllServerInfos["s2"])
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/infosync/mockGetServerInfo", `return("")`))

stores := []*metapb.Store{
{
Expand Down Expand Up @@ -304,8 +321,77 @@ func TestClosestReplicaReadChecker(t *testing.T) {
require.False(t, variable.IsAdaptiveReplicaReadEnabled())
}

// partial matches
mockedAllServerInfos = map[string]*infosync.ServerInfo{
"s1": {
ID: "s1",
Labels: map[string]string{
"zone": "zone1",
},
},
"s2": {
ID: "s2",
Labels: map[string]string{
"zone": "zone2",
},
},
"s22": {
ID: "s22",
Labels: map[string]string{
"zone": "zone2",
},
},
"s3": {
ID: "s3",
Labels: map[string]string{
"zone": "zone3",
},
},
"s4": {
ID: "s4",
Labels: map[string]string{
"zone": "zone4",
},
},
}
pdClient.stores = stores
infosync.SetAllServerInfo4Test(mockedAllServerInfos)
cases := []struct {
id string
matches bool
}{
{
id: "s1",
matches: true,
},
{
id: "s2",
matches: true,
},
{
id: "s22",
matches: false,
},
{
id: "s3",
matches: true,
},
{
id: "s4",
matches: false,
},
}
for _, c := range cases {
infosync.SetServerInfo4Test(mockedAllServerInfos[c.id])
variable.SetEnableAdaptiveReplicaRead(!c.matches)
err = dom.checkReplicaRead(ctx, pdClient)
require.Nil(t, err)
require.Equal(t, c.matches, variable.IsAdaptiveReplicaReadEnabled())
}

variable.SetEnableAdaptiveReplicaRead(true)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/domain/infosync/mockGetAllServerInfo"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/domain/infosync/mockGetServerInfo"))
}

type mockInfoPdClient struct {
Expand Down
33 changes: 20 additions & 13 deletions domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,9 @@ func SetMockTiFlash(tiflash *MockTiFlash) {

// GetServerInfo gets self server static information.
func GetServerInfo() (*ServerInfo, error) {
failpoint.Inject("mockGetServerInfo", func() {
failpoint.Return(serverInfo4Test, nil)
})
is, err := getGlobalInfoSyncer()
if err != nil {
return nil, err
Expand Down Expand Up @@ -314,22 +317,26 @@ func (is *InfoSyncer) getServerInfoByID(ctx context.Context, id string) (*Server
return info, nil
}

// global variables only used in failpoint tests.
var (
allServerInfo4Test map[string]*ServerInfo
serverInfo4Test *ServerInfo
)

// SetAllServerInfo4Test set the value of `allServerInfo4Test` used in unit test.
func SetAllServerInfo4Test(infos map[string]*ServerInfo) {
allServerInfo4Test = infos
}

// SetServerInfo4Test set the value of `serverInfo4Test` used in unit test.
func SetServerInfo4Test(info *ServerInfo) {
serverInfo4Test = info
}

// GetAllServerInfo gets all servers static information from etcd.
func GetAllServerInfo(ctx context.Context) (map[string]*ServerInfo, error) {
failpoint.Inject("mockGetAllServerInfo", func() {
res := map[string]*ServerInfo{
"fa598405-a08e-4e74-83ff-75c30b1daedc": {
Labels: map[string]string{
"zone": "zone1",
},
},
"ad84dbbd-5a50-4742-a73c-4f674d41d4bd": {
Labels: map[string]string{
"zone": "zone2",
},
},
}
failpoint.Return(res, nil)
failpoint.Return(allServerInfo4Test, nil)
xhebox marked this conversation as resolved.
Show resolved Hide resolved
})
is, err := getGlobalInfoSyncer()
if err != nil {
Expand Down