Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
filter out all TiFlash nodes when retrieving lists of stores from PD (#…
Browse files Browse the repository at this point in the history
…187)

* conn: ignore nodes with label engine=tiflash

* conn: disallow TiFlash on restore, only skip TiFlash on backup
  • Loading branch information
kennytm authored Mar 16, 2020
1 parent 512855d commit 6b88e51
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 10 deletions.
3 changes: 2 additions & 1 deletion pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/util/ranger"
"go.uber.org/zap"

"github.com/pingcap/br/pkg/conn"
"github.com/pingcap/br/pkg/rtree"
"github.com/pingcap/br/pkg/storage"
"github.com/pingcap/br/pkg/summary"
Expand Down Expand Up @@ -395,7 +396,7 @@ func (bc *Client) BackupRange(
defer cancel()

var allStores []*metapb.Store
allStores, err = bc.mgr.GetPDClient().GetAllStores(ctx, pd.WithExcludeTombstone())
allStores, err = conn.GetAllTiKVStores(ctx, bc.mgr.GetPDClient(), conn.SkipTiFlash)
if err != nil {
return errors.Trace(err)
}
Expand Down
51 changes: 49 additions & 2 deletions pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,61 @@ func pdRequest(
return r, nil
}

// UnexpectedStoreBehavior is the action to do in GetAllTiKVStores when a
// non-TiKV store (e.g. TiFlash store) is found.
type UnexpectedStoreBehavior uint8

const (
// ErrorOnTiFlash causes GetAllTiKVStores to return error when the store is
// found to be a TiFlash node.
ErrorOnTiFlash UnexpectedStoreBehavior = 0
// SkipTiFlash causes GetAllTiKVStores to skip the store when it is found to
// be a TiFlash node.
SkipTiFlash UnexpectedStoreBehavior = 1
)

// GetAllTiKVStores returns all TiKV stores registered to the PD client. The
// stores must not be a tombstone and must never contain a label `engine=tiflash`.
func GetAllTiKVStores(
ctx context.Context,
pdClient pd.Client,
unexpectedStoreBehavior UnexpectedStoreBehavior,
) ([]*metapb.Store, error) {
// get all live stores.
stores, err := pdClient.GetAllStores(ctx, pd.WithExcludeTombstone())
if err != nil {
return nil, err
}

// filter out all stores which are TiFlash.
j := 0
skipStore:
for _, store := range stores {
for _, label := range store.Labels {
if label.Key == "engine" && label.Value == "tiflash" {
if unexpectedStoreBehavior == SkipTiFlash {
continue skipStore
}
return nil, errors.Errorf(
"cannot restore to a cluster with active TiFlash stores (store %d at %s)", store.Id, store.Address)
}
}
stores[j] = store
j++
}
return stores[:j], nil
}

// NewMgr creates a new Mgr.
func NewMgr(
ctx context.Context,
g glue.Glue,
pdAddrs string,
storage tikv.Storage,
tlsConf *tls.Config,
securityOption pd.SecurityOption) (*Mgr, error) {
securityOption pd.SecurityOption,
unexpectedStoreBehavior UnexpectedStoreBehavior,
) (*Mgr, error) {
addrs := strings.Split(pdAddrs, ",")

failure := errors.Errorf("pd address (%s) has wrong format", pdAddrs)
Expand Down Expand Up @@ -143,7 +190,7 @@ func NewMgr(
log.Info("new mgr", zap.String("pdAddrs", pdAddrs))

// Check live tikv.
stores, err := pdClient.GetAllStores(ctx, pd.WithExcludeTombstone())
stores, err := GetAllTiKVStores(ctx, pdClient, unexpectedStoreBehavior)
if err != nil {
log.Error("fail to get store", zap.Error(err))
return nil, err
Expand Down
88 changes: 88 additions & 0 deletions pkg/conn/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
pd "github.com/pingcap/pd/v4/client"
"github.com/pingcap/pd/v4/server/core"
"github.com/pingcap/pd/v4/server/statistics"
"github.com/pingcap/tidb/util/codec"
Expand Down Expand Up @@ -149,3 +150,90 @@ func (s *testClientSuite) TestRegionCount(c *C) {
c.Assert(err, IsNil)
c.Assert(resp, Equals, 2)
}

type fakePDClient struct {
pd.Client
stores []*metapb.Store
}

func (fpdc fakePDClient) GetAllStores(context.Context, ...pd.GetStoreOption) ([]*metapb.Store, error) {
return append([]*metapb.Store{}, fpdc.stores...), nil
}

func (s *testClientSuite) TestGetAllTiKVStores(c *C) {
testCases := []struct {
stores []*metapb.Store
unexpectedStoreBehavior UnexpectedStoreBehavior
expectedStores map[uint64]int
expectedError string
}{
{
stores: []*metapb.Store{
{Id: 1},
},
unexpectedStoreBehavior: SkipTiFlash,
expectedStores: map[uint64]int{1: 1},
},
{
stores: []*metapb.Store{
{Id: 1},
},
unexpectedStoreBehavior: ErrorOnTiFlash,
expectedStores: map[uint64]int{1: 1},
},
{
stores: []*metapb.Store{
{Id: 1},
{Id: 2, Labels: []*metapb.StoreLabel{{Key: "engine", Value: "tiflash"}}},
},
unexpectedStoreBehavior: SkipTiFlash,
expectedStores: map[uint64]int{1: 1},
},
{
stores: []*metapb.Store{
{Id: 1},
{Id: 2, Labels: []*metapb.StoreLabel{{Key: "engine", Value: "tiflash"}}},
},
unexpectedStoreBehavior: ErrorOnTiFlash,
expectedError: "cannot restore to a cluster with active TiFlash stores.*",
},
{
stores: []*metapb.Store{
{Id: 1},
{Id: 2, Labels: []*metapb.StoreLabel{{Key: "engine", Value: "tiflash"}}},
{Id: 3},
{Id: 4, Labels: []*metapb.StoreLabel{{Key: "engine", Value: "tikv"}}},
{Id: 5, Labels: []*metapb.StoreLabel{{Key: "else", Value: "tikv"}, {Key: "engine", Value: "tiflash"}}},
{Id: 6, Labels: []*metapb.StoreLabel{{Key: "else", Value: "tiflash"}, {Key: "engine", Value: "tikv"}}},
},
unexpectedStoreBehavior: SkipTiFlash,
expectedStores: map[uint64]int{1: 1, 3: 1, 4: 1, 6: 1},
},
{
stores: []*metapb.Store{
{Id: 1},
{Id: 2, Labels: []*metapb.StoreLabel{{Key: "engine", Value: "tiflash"}}},
{Id: 3},
{Id: 4, Labels: []*metapb.StoreLabel{{Key: "engine", Value: "tikv"}}},
{Id: 5, Labels: []*metapb.StoreLabel{{Key: "else", Value: "tikv"}, {Key: "engine", Value: "tiflash"}}},
{Id: 6, Labels: []*metapb.StoreLabel{{Key: "else", Value: "tiflash"}, {Key: "engine", Value: "tikv"}}},
},
unexpectedStoreBehavior: ErrorOnTiFlash,
expectedError: "cannot restore to a cluster with active TiFlash stores.*",
},
}

for _, testCase := range testCases {
pdClient := fakePDClient{stores: testCase.stores}
stores, err := GetAllTiKVStores(context.Background(), pdClient, testCase.unexpectedStoreBehavior)
if len(testCase.expectedError) != 0 {
c.Assert(err, ErrorMatches, testCase.expectedError)
continue
}
foundStores := make(map[uint64]int)
for _, store := range stores {
foundStores[store.Id]++
}
c.Assert(foundStores, DeepEquals, testCase.expectedStores)
}
}
5 changes: 3 additions & 2 deletions pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"google.golang.org/grpc/keepalive"

"github.com/pingcap/br/pkg/checksum"
"github.com/pingcap/br/pkg/conn"
"github.com/pingcap/br/pkg/glue"
"github.com/pingcap/br/pkg/summary"
"github.com/pingcap/br/pkg/utils"
Expand Down Expand Up @@ -261,7 +262,7 @@ func (rc *Client) ExecDDLs(ddlJobs []*model.Job) error {

func (rc *Client) setSpeedLimit() error {
if !rc.hasSpeedLimited && rc.rateLimit != 0 {
stores, err := rc.pdClient.GetAllStores(rc.ctx, pd.WithExcludeTombstone())
stores, err := conn.GetAllTiKVStores(rc.ctx, rc.pdClient, conn.ErrorOnTiFlash)
if err != nil {
return err
}
Expand Down Expand Up @@ -345,7 +346,7 @@ func (rc *Client) SwitchToNormalMode(ctx context.Context) error {
}

func (rc *Client) switchTiKVMode(ctx context.Context, mode import_sstpb.SwitchMode) error {
stores, err := rc.pdClient.GetAllStores(ctx, pd.WithExcludeTombstone())
stores, err := conn.GetAllTiKVStores(ctx, rc.pdClient, conn.ErrorOnTiFlash)
if err != nil {
return errors.Trace(err)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"go.uber.org/zap"

"github.com/pingcap/br/pkg/backup"
"github.com/pingcap/br/pkg/conn"
"github.com/pingcap/br/pkg/glue"
"github.com/pingcap/br/pkg/storage"
"github.com/pingcap/br/pkg/summary"
Expand Down Expand Up @@ -101,7 +102,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
if err != nil {
return err
}
mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS)
mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS, conn.SkipTiFlash)
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/task/backup_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/spf13/pflag"

"github.com/pingcap/br/pkg/backup"
"github.com/pingcap/br/pkg/conn"
"github.com/pingcap/br/pkg/glue"
"github.com/pingcap/br/pkg/rtree"
"github.com/pingcap/br/pkg/storage"
Expand Down Expand Up @@ -89,7 +90,7 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *BackupRaw
if err != nil {
return err
}
mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS)
mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS, conn.SkipTiFlash)
if err != nil {
return err
}
Expand Down
10 changes: 8 additions & 2 deletions pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,13 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error {
}

// newMgr creates a new mgr at the given PD address.
func newMgr(ctx context.Context, g glue.Glue, pds []string, tlsConfig TLSConfig) (*conn.Mgr, error) {
func newMgr(
ctx context.Context,
g glue.Glue,
pds []string,
tlsConfig TLSConfig,
unexpectedStoreBehavior conn.UnexpectedStoreBehavior,
) (*conn.Mgr, error) {
var (
tlsConf *tls.Config
err error
Expand All @@ -234,7 +240,7 @@ func newMgr(ctx context.Context, g glue.Glue, pds []string, tlsConfig TLSConfig)
if err != nil {
return nil, err
}
return conn.NewMgr(ctx, g, pdAddress, store.(tikv.Storage), tlsConf, securityOption)
return conn.NewMgr(ctx, g, pdAddress, store.(tikv.Storage), tlsConf, securityOption, unexpectedStoreBehavior)
}

// GetStorage gets the storage backend from the config.
Expand Down
2 changes: 1 addition & 1 deletion pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
ctx, cancel := context.WithCancel(c)
defer cancel()

mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS)
mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS, conn.ErrorOnTiFlash)
if err != nil {
return err
}
Expand Down

0 comments on commit 6b88e51

Please sign in to comment.