diff --git a/br/pkg/conn/conn.go b/br/pkg/conn/conn.go index 54be8ab7987ce..a8cdc9f4e9c6c 100644 --- a/br/pkg/conn/conn.go +++ b/br/pkg/conn/conn.go @@ -70,7 +70,7 @@ type Mgr struct { } func GetAllTiKVStoresWithRetry(ctx context.Context, - pdClient pd.Client, + pdClient util.StoreMeta, storeBehavior util.StoreBehavior, ) ([]*metapb.Store, error) { stores := make([]*metapb.Store, 0) diff --git a/br/pkg/conn/util/util.go b/br/pkg/conn/util/util.go index 0479030c1c5ac..58f400a231d25 100644 --- a/br/pkg/conn/util/util.go +++ b/br/pkg/conn/util/util.go @@ -28,11 +28,20 @@ const ( TiFlashOnly StoreBehavior = 2 ) +// StoreMeta is the required interface for a watcher. +// It is striped from pd.Client. +type StoreMeta interface { + // GetAllStores gets all stores from pd. + // The store may expire later. Caller is responsible for caching and taking care + // of store change. + GetAllStores(ctx context.Context, opts ...pd.GetStoreOption) ([]*metapb.Store, error) +} + // GetAllTiKVStores returns all TiKV stores registered to the PD client. The // stores must not be a tombstone and must never contain a label `engine=tiflash`. func GetAllTiKVStores( ctx context.Context, - pdClient pd.Client, + pdClient StoreMeta, storeBehavior StoreBehavior, ) ([]*metapb.Store, error) { // get all live stores. diff --git a/br/pkg/restore/BUILD.bazel b/br/pkg/restore/BUILD.bazel index 7d0071047d8e8..ee2c23f177bbb 100644 --- a/br/pkg/restore/BUILD.bazel +++ b/br/pkg/restore/BUILD.bazel @@ -43,6 +43,7 @@ go_library( "//br/pkg/summary", "//br/pkg/utils", "//br/pkg/utils/iter", + "//br/pkg/utils/storewatch", "//br/pkg/version", "//config", "//ddl", diff --git a/br/pkg/restore/data.go b/br/pkg/restore/data.go index 265126b9411af..880cc49a9b904 100644 --- a/br/pkg/restore/data.go +++ b/br/pkg/restore/data.go @@ -4,6 +4,7 @@ package restore import ( "context" "io" + "time" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" @@ -12,7 +13,9 @@ import ( "github.com/pingcap/tidb/br/pkg/common" "github.com/pingcap/tidb/br/pkg/conn" "github.com/pingcap/tidb/br/pkg/glue" + "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/utils" + "github.com/pingcap/tidb/br/pkg/utils/storewatch" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/util/mathutil" tikvstore "github.com/tikv/client-go/v2/kv" @@ -48,6 +51,9 @@ func RecoverData(ctx context.Context, resolveTS uint64, allStores []*metapb.Stor return totalRegions, errors.Trace(err) } + // Once TiKV shuts down and reboot then, it may be left with no leader because of the recovery mode. + // This wathcher will retrigger `RecoveryRegions` for those stores. + recovery.SpawnTiKVShutDownWatchers(ctx) if err := recovery.RecoverRegions(ctx); err != nil { return totalRegions, errors.Trace(err) } @@ -213,6 +219,39 @@ func (recovery *Recovery) GetTotalRegions() int { return len(regions) } +func (recovery *Recovery) RecoverRegionOfStore(ctx context.Context, storeID uint64, plan []*recovpb.RecoverRegionRequest) error { + storeAddr := getStoreAddress(recovery.allStores, storeID) + recoveryClient, conn, err := recovery.newRecoveryClient(ctx, storeAddr) + if err != nil { + log.Error("create tikv client failed", zap.Uint64("store id", storeID)) + return errors.Trace(err) + } + defer conn.Close() + log.Info("send recover region to tikv", zap.String("tikv address", storeAddr), zap.Uint64("store id", storeID)) + stream, err := recoveryClient.RecoverRegion(ctx) + if err != nil { + log.Error("create recover region failed", zap.Uint64("store id", storeID)) + return errors.Trace(err) + } + + // for a TiKV, send the stream + for _, s := range plan { + if err = stream.Send(s); err != nil { + log.Error("send recover region failed", zap.Error(err)) + return errors.Trace(err) + } + } + + reply, err := stream.CloseAndRecv() + if err != nil { + log.Error("close the stream failed") + return errors.Trace(err) + } + recovery.progress.Inc() + log.Info("recover region execution success", zap.Uint64("store id", reply.GetStoreId())) + return nil +} + // RecoverRegions send the recovery plan to recovery region (force leader etc) // only tikvs have regions whose have to recover be sent func (recovery *Recovery) RecoverRegions(ctx context.Context) (err error) { @@ -224,46 +263,60 @@ func (recovery *Recovery) RecoverRegions(ctx context.Context) (err error) { if err := ectx.Err(); err != nil { break } + storeId := storeId + plan := plan - storeAddr := getStoreAddress(recovery.allStores, storeId) - recoveryPlan := plan - recoveryStoreId := storeId workers.ApplyOnErrorGroup(eg, func() error { - recoveryClient, conn, err := recovery.newRecoveryClient(ectx, storeAddr) - if err != nil { - log.Error("create tikv client failed", zap.Uint64("store id", recoveryStoreId)) - return errors.Trace(err) - } - defer conn.Close() - log.Info("send recover region to tikv", zap.String("tikv address", storeAddr), zap.Uint64("store id", recoveryStoreId)) - stream, err := recoveryClient.RecoverRegion(ectx) - if err != nil { - log.Error("create recover region failed", zap.Uint64("store id", recoveryStoreId)) - return errors.Trace(err) - } - - // for a TiKV, send the stream - for _, s := range recoveryPlan { - if err = stream.Send(s); err != nil { - log.Error("send recover region failed", zap.Error(err)) - return errors.Trace(err) - } - } - - reply, err := stream.CloseAndRecv() - if err != nil { - log.Error("close the stream failed") - return errors.Trace(err) - } - recovery.progress.Inc() - log.Info("recover region execution success", zap.Uint64("store id", reply.GetStoreId())) - return nil + return recovery.RecoverRegionOfStore(ectx, storeId, plan) }) } // Wait for all TiKV instances force leader and wait apply to last log. return eg.Wait() } +func (recovery *Recovery) SpawnTiKVShutDownWatchers(ctx context.Context) { + rebootStores := map[uint64]struct{}{} + cb := storewatch.MakeCallback(storewatch.WithOnReboot(func(s *metapb.Store) { + log.Info("Store reboot detected, will regenerate leaders.", zap.Uint64("id", s.GetId())) + rebootStores[s.Id] = struct{}{} + }), storewatch.WithOnDisconnect(func(s *metapb.Store) { + log.Warn("A store disconnected.", zap.Uint64("id", s.GetId()), zap.String("addr", s.GetAddress())) + }), storewatch.WithOnNewStoreRegistered(func(s *metapb.Store) { + log.Info("Start to observing the state of store.", zap.Uint64("id", s.GetId())) + })) + watcher := storewatch.New(recovery.mgr.PDClient(), cb) + tick := time.NewTicker(30 * time.Second) + mainLoop := func() { + for { + select { + case <-ctx.Done(): + return + case <-tick.C: + err := watcher.Step(ctx) + if err != nil { + log.Warn("Failed to step watcher.", logutil.ShortError(err)) + } + for id := range rebootStores { + plan, ok := recovery.RecoveryPlan[id] + if !ok { + log.Warn("Store reboot detected, but no recovery plan found.", zap.Uint64("id", id)) + continue + } + err := recovery.RecoverRegionOfStore(ctx, id, plan) + if err != nil { + log.Warn("Store reboot detected, but failed to regenerate leader.", zap.Uint64("id", id), logutil.ShortError(err)) + continue + } + log.Info("Succeed to reload the leader in store.", zap.Uint64("id", id)) + delete(rebootStores, id) + } + } + } + } + + go mainLoop() +} + // WaitApply send wait apply to all tikv ensure all region peer apply log into the last func (recovery *Recovery) WaitApply(ctx context.Context) (err error) { eg, ectx := errgroup.WithContext(ctx) diff --git a/br/pkg/utils/storewatch/BUILD.bazel b/br/pkg/utils/storewatch/BUILD.bazel new file mode 100644 index 0000000000000..640b7446d9517 --- /dev/null +++ b/br/pkg/utils/storewatch/BUILD.bazel @@ -0,0 +1,29 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "storewatch", + srcs = ["watching.go"], + importpath = "github.com/pingcap/tidb/br/pkg/utils/storewatch", + visibility = ["//visibility:public"], + deps = [ + "//br/pkg/conn", + "//br/pkg/conn/util", + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_kvproto//pkg/metapb", + ], +) + +go_test( + name = "storewatch_test", + timeout = "short", + srcs = ["watching_test.go"], + flaky = True, + shard_count = 3, + deps = [ + ":storewatch", + "//br/pkg/conn/util", + "@com_github_pingcap_kvproto//pkg/metapb", + "@com_github_stretchr_testify//require", + "@com_github_tikv_pd_client//:client", + ], +) diff --git a/br/pkg/utils/storewatch/watching.go b/br/pkg/utils/storewatch/watching.go new file mode 100644 index 0000000000000..67135803fecaf --- /dev/null +++ b/br/pkg/utils/storewatch/watching.go @@ -0,0 +1,136 @@ +// package storewatch provides a `Watcher` type which allows +// the user to listen the events of lifetime of stores. +package storewatch + +import ( + "context" + + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/tidb/br/pkg/conn" + "github.com/pingcap/tidb/br/pkg/conn/util" +) + +// Callback will be called the supported event triggered. +type Callback interface { + OnNewStoreRegistered(store *metapb.Store) + OnDisconnect(store *metapb.Store) + OnReboot(store *metapb.Store) +} + +// DynCallback is a function based callback set. +type DynCallback struct { + onNewStoreRegistered func(*metapb.Store) + onDisconnect func(*metapb.Store) + onReboot func(*metapb.Store) +} + +// OnNewStoreRegistered will be called once new region added to be watched. +func (cb *DynCallback) OnNewStoreRegistered(store *metapb.Store) { + if cb.onNewStoreRegistered != nil { + cb.onNewStoreRegistered(store) + } +} + +// OnDisconnect will be called once the store is disconnected. +func (cb *DynCallback) OnDisconnect(store *metapb.Store) { + if cb.onDisconnect != nil { + cb.onDisconnect(store) + } +} + +// OnReboot will be called once the store is rebooted. +func (cb *DynCallback) OnReboot(store *metapb.Store) { + if cb.onReboot != nil { + cb.onReboot(store) + } +} + +// DynCallbackOpt is the option for DynCallback. +type DynCallbackOpt func(*DynCallback) + +// WithOnNewStoreRegistered adds a hook to the callback. +func WithOnNewStoreRegistered(f func(*metapb.Store)) DynCallbackOpt { + return func(cb *DynCallback) { + cb.onNewStoreRegistered = f + } +} + +// WithOnDisconnect adds a hook to the callback. +func WithOnDisconnect(f func(*metapb.Store)) DynCallbackOpt { + return func(cb *DynCallback) { + cb.onDisconnect = f + } +} + +// WithOnReboot adds a hook to the callback. +func WithOnReboot(f func(*metapb.Store)) DynCallbackOpt { + return func(cb *DynCallback) { + cb.onReboot = f + } +} + +// MakeCallback creates a callback with the given options. +// Allowed options: WithOnNewStoreRegistered, WithOnDisconnect, WithOnReboot. +func MakeCallback(opts ...DynCallbackOpt) Callback { + cb := &DynCallback{} + for _, opt := range opts { + opt(cb) + } + return cb +} + +// Watcher watches the lifetime of stores. +// generally it should be advanced by calling the `Step` call. +type Watcher struct { + cli util.StoreMeta + cb Callback + + lastStores map[uint64]*metapb.Store +} + +func New(cli util.StoreMeta, cb Callback) *Watcher { + return &Watcher{ + cli: cli, + cb: cb, + lastStores: make(map[uint64]*metapb.Store), + } +} + +func (w *Watcher) Step(ctx context.Context) error { + liveStores, err := conn.GetAllTiKVStoresWithRetry(ctx, w.cli, util.SkipTiFlash) + if err != nil { + return errors.Annotate(err, "failed to update store list") + } + recorded := map[uint64]struct{}{} + for _, store := range liveStores { + w.updateStore(store) + recorded[store.GetId()] = struct{}{} + } + w.retain(recorded) + return nil +} + +// updateStore updates the current store. and call the hooks needed. +func (w *Watcher) updateStore(newStore *metapb.Store) { + lastStore, ok := w.lastStores[newStore.GetId()] + w.lastStores[newStore.GetId()] = newStore + if !ok { + w.cb.OnNewStoreRegistered(newStore) + return + } + if lastStore.GetState() == metapb.StoreState_Up && newStore.GetState() == metapb.StoreState_Offline { + w.cb.OnDisconnect(newStore) + } + if lastStore.StartTimestamp != newStore.StartTimestamp { + w.cb.OnReboot(newStore) + } +} + +func (w *Watcher) retain(storeSet map[uint64]struct{}) { + for id := range w.lastStores { + if _, ok := storeSet[id]; !ok { + delete(w.lastStores, id) + } + } +} diff --git a/br/pkg/utils/storewatch/watching_test.go b/br/pkg/utils/storewatch/watching_test.go new file mode 100644 index 0000000000000..49ab4034bcfd7 --- /dev/null +++ b/br/pkg/utils/storewatch/watching_test.go @@ -0,0 +1,111 @@ +package storewatch_test + +import ( + "context" + "fmt" + "testing" + + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/tidb/br/pkg/conn/util" + "github.com/pingcap/tidb/br/pkg/utils/storewatch" + "github.com/stretchr/testify/require" + pd "github.com/tikv/pd/client" +) + +type SequentialReturningStoreMeta struct { + sequence [][]*metapb.Store +} + +func NewSequentialReturningStoreMeta(sequence [][]*metapb.Store) util.StoreMeta { + return &SequentialReturningStoreMeta{sequence: sequence} +} + +func (s *SequentialReturningStoreMeta) GetAllStores(ctx context.Context, opts ...pd.GetStoreOption) ([]*metapb.Store, error) { + if len(s.sequence) == 0 { + return nil, fmt.Errorf("too many call to `GetAllStores` in test") + } + stores := s.sequence[0] + s.sequence = s.sequence[1:] + return stores, nil +} + +func TestOnRegister(t *testing.T) { + // A sequence of store state that we should believe the store is offline. + seq := NewSequentialReturningStoreMeta([][]*metapb.Store{ + { + { + Id: 1, + State: metapb.StoreState_Up, + }, + }, + }) + callBackCalled := false + callback := storewatch.MakeCallback(storewatch.WithOnNewStoreRegistered(func(s *metapb.Store) { callBackCalled = true })) + ctx := context.Background() + + watcher := storewatch.New(seq, callback) + require.NoError(t, watcher.Step(ctx)) + require.True(t, callBackCalled) +} + +func TestOnOffline(t *testing.T) { + // A sequence of store state that we should believe the store is offline. + seq := NewSequentialReturningStoreMeta([][]*metapb.Store{ + { + { + Id: 1, + State: metapb.StoreState_Up, + }, + }, + { + { + Id: 1, + State: metapb.StoreState_Offline, + }, + }, + }) + callBackCalled := false + callback := storewatch.MakeCallback(storewatch.WithOnDisconnect(func(s *metapb.Store) { callBackCalled = true })) + ctx := context.Background() + + watcher := storewatch.New(seq, callback) + require.NoError(t, watcher.Step(ctx)) + require.NoError(t, watcher.Step(ctx)) + require.True(t, callBackCalled) +} + +func TestOnReboot(t *testing.T) { + // A sequence of store state that we should believe the store is offline. + seq := NewSequentialReturningStoreMeta([][]*metapb.Store{ + { + { + Id: 1, + State: metapb.StoreState_Up, + StartTimestamp: 1, + }, + }, + { + { + Id: 1, + State: metapb.StoreState_Offline, + StartTimestamp: 1, + }, + }, + { + { + Id: 1, + State: metapb.StoreState_Up, + StartTimestamp: 2, + }, + }, + }) + callBackCalled := false + callback := storewatch.MakeCallback(storewatch.WithOnReboot(func(s *metapb.Store) { callBackCalled = true })) + ctx := context.Background() + + watcher := storewatch.New(seq, callback) + require.NoError(t, watcher.Step(ctx)) + require.NoError(t, watcher.Step(ctx)) + require.NoError(t, watcher.Step(ctx)) + require.True(t, callBackCalled) +}