Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

snap_restore: resend recover_region while there are TiKV restarts #45361

Merged
merged 10 commits into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion br/pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type Mgr struct {
}

func GetAllTiKVStoresWithRetry(ctx context.Context,
pdClient pd.Client,
pdClient util.StoreMeta,
storeBehavior util.StoreBehavior,
) ([]*metapb.Store, error) {
stores := make([]*metapb.Store, 0)
Expand Down
11 changes: 10 additions & 1 deletion br/pkg/conn/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,20 @@ const (
TiFlashOnly StoreBehavior = 2
)

// StoreMeta is the required interface for a watcher.
// It is striped from pd.Client.
type StoreMeta interface {
// GetAllStores gets all stores from pd.
// The store may expire later. Caller is responsible for caching and taking care
// of store change.
GetAllStores(ctx context.Context, opts ...pd.GetStoreOption) ([]*metapb.Store, error)
}

// GetAllTiKVStores returns all TiKV stores registered to the PD client. The
// stores must not be a tombstone and must never contain a label `engine=tiflash`.
func GetAllTiKVStores(
ctx context.Context,
pdClient pd.Client,
pdClient StoreMeta,
storeBehavior StoreBehavior,
) ([]*metapb.Store, error) {
// get all live stores.
Expand Down
1 change: 1 addition & 0 deletions br/pkg/restore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ go_library(
"//br/pkg/summary",
"//br/pkg/utils",
"//br/pkg/utils/iter",
"//br/pkg/utils/storewatch",
"//br/pkg/version",
"//config",
"//ddl",
Expand Down
117 changes: 85 additions & 32 deletions br/pkg/restore/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import (
"context"
"io"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
Expand All @@ -12,7 +13,9 @@
"github.com/pingcap/tidb/br/pkg/common"
"github.com/pingcap/tidb/br/pkg/conn"
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/utils/storewatch"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/util/mathutil"
tikvstore "github.com/tikv/client-go/v2/kv"
Expand Down Expand Up @@ -48,6 +51,9 @@
return totalRegions, errors.Trace(err)
}

// Once TiKV shuts down and reboot then, it may be left with no leader because of the recovery mode.
// This wathcher will retrigger `RecoveryRegions` for those stores.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// This wathcher will retrigger `RecoveryRegions` for those stores.
// This watcher will re-trigger `RecoveryRegions` for those stores.

recovery.SpawnTiKVShutDownWatchers(ctx)

Check warning on line 56 in br/pkg/restore/data.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/restore/data.go#L56

Added line #L56 was not covered by tests
if err := recovery.RecoverRegions(ctx); err != nil {
return totalRegions, errors.Trace(err)
}
Expand Down Expand Up @@ -213,6 +219,39 @@
return len(regions)
}

func (recovery *Recovery) RecoverRegionOfStore(ctx context.Context, storeID uint64, plan []*recovpb.RecoverRegionRequest) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func (recovery *Recovery) RecoverRegionOfStore(ctx context.Context, storeID uint64, plan []*recovpb.RecoverRegionRequest) error {
func (recovery *Recovery) RecoverRegionsOfStore(ctx context.Context, storeID uint64, plan []*recovpb.RecoverRegionRequest) error {

storeAddr := getStoreAddress(recovery.allStores, storeID)
recoveryClient, conn, err := recovery.newRecoveryClient(ctx, storeAddr)
if err != nil {
log.Error("create tikv client failed", zap.Uint64("store id", storeID))
return errors.Trace(err)
}
defer conn.Close()
log.Info("send recover region to tikv", zap.String("tikv address", storeAddr), zap.Uint64("store id", storeID))
stream, err := recoveryClient.RecoverRegion(ctx)
if err != nil {
log.Error("create recover region failed", zap.Uint64("store id", storeID))
return errors.Trace(err)
}

Check warning on line 235 in br/pkg/restore/data.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/restore/data.go#L222-L235

Added lines #L222 - L235 were not covered by tests

// for a TiKV, send the stream
for _, s := range plan {
if err = stream.Send(s); err != nil {
log.Error("send recover region failed", zap.Error(err))
return errors.Trace(err)
}

Check warning on line 242 in br/pkg/restore/data.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/restore/data.go#L238-L242

Added lines #L238 - L242 were not covered by tests
}

reply, err := stream.CloseAndRecv()
if err != nil {
log.Error("close the stream failed")
return errors.Trace(err)
}
recovery.progress.Inc()
log.Info("recover region execution success", zap.Uint64("store id", reply.GetStoreId()))
return nil

Check warning on line 252 in br/pkg/restore/data.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/restore/data.go#L245-L252

Added lines #L245 - L252 were not covered by tests
}

// RecoverRegions send the recovery plan to recovery region (force leader etc)
// only tikvs have regions whose have to recover be sent
func (recovery *Recovery) RecoverRegions(ctx context.Context) (err error) {
Expand All @@ -224,46 +263,60 @@
if err := ectx.Err(); err != nil {
break
}
storeId := storeId
plan := plan

Check warning on line 267 in br/pkg/restore/data.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/restore/data.go#L266-L267

Added lines #L266 - L267 were not covered by tests

storeAddr := getStoreAddress(recovery.allStores, storeId)
recoveryPlan := plan
recoveryStoreId := storeId
workers.ApplyOnErrorGroup(eg, func() error {
recoveryClient, conn, err := recovery.newRecoveryClient(ectx, storeAddr)
if err != nil {
log.Error("create tikv client failed", zap.Uint64("store id", recoveryStoreId))
return errors.Trace(err)
}
defer conn.Close()
log.Info("send recover region to tikv", zap.String("tikv address", storeAddr), zap.Uint64("store id", recoveryStoreId))
stream, err := recoveryClient.RecoverRegion(ectx)
if err != nil {
log.Error("create recover region failed", zap.Uint64("store id", recoveryStoreId))
return errors.Trace(err)
}

// for a TiKV, send the stream
for _, s := range recoveryPlan {
if err = stream.Send(s); err != nil {
log.Error("send recover region failed", zap.Error(err))
return errors.Trace(err)
}
}

reply, err := stream.CloseAndRecv()
if err != nil {
log.Error("close the stream failed")
return errors.Trace(err)
}
recovery.progress.Inc()
log.Info("recover region execution success", zap.Uint64("store id", reply.GetStoreId()))
return nil
return recovery.RecoverRegionOfStore(ectx, storeId, plan)

Check warning on line 270 in br/pkg/restore/data.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/restore/data.go#L270

Added line #L270 was not covered by tests
})
}
// Wait for all TiKV instances force leader and wait apply to last log.
return eg.Wait()
}

func (recovery *Recovery) SpawnTiKVShutDownWatchers(ctx context.Context) {
rebootStores := map[uint64]struct{}{}
cb := storewatch.MakeCallback(storewatch.WithOnReboot(func(s *metapb.Store) {
log.Info("Store reboot detected, will regenerate leaders.", zap.Uint64("id", s.GetId()))
rebootStores[s.Id] = struct{}{}
}), storewatch.WithOnDisconnect(func(s *metapb.Store) {
log.Warn("A store disconnected.", zap.Uint64("id", s.GetId()), zap.String("addr", s.GetAddress()))
}), storewatch.WithOnNewStoreRegistered(func(s *metapb.Store) {
log.Info("Start to observing the state of store.", zap.Uint64("id", s.GetId()))
}))
watcher := storewatch.New(recovery.mgr.PDClient(), cb)
tick := time.NewTicker(30 * time.Second)
mainLoop := func() {
for {
select {
case <-ctx.Done():
return
case <-tick.C:
err := watcher.Step(ctx)
if err != nil {
log.Warn("Failed to step watcher.", logutil.ShortError(err))
}
for id := range rebootStores {
plan, ok := recovery.RecoveryPlan[id]
if !ok {
log.Warn("Store reboot detected, but no recovery plan found.", zap.Uint64("id", id))
continue

Check warning on line 303 in br/pkg/restore/data.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/restore/data.go#L277-L303

Added lines #L277 - L303 were not covered by tests
}
err := recovery.RecoverRegionOfStore(ctx, id, plan)
if err != nil {
log.Warn("Store reboot detected, but failed to regenerate leader.", zap.Uint64("id", id), logutil.ShortError(err))
continue

Check warning on line 308 in br/pkg/restore/data.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/restore/data.go#L305-L308

Added lines #L305 - L308 were not covered by tests
}
log.Info("Succeed to reload the leader in store.", zap.Uint64("id", id))
delete(rebootStores, id)

Check warning on line 311 in br/pkg/restore/data.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/restore/data.go#L310-L311

Added lines #L310 - L311 were not covered by tests
}
}
}
}

go mainLoop()

Check warning on line 317 in br/pkg/restore/data.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/restore/data.go#L317

Added line #L317 was not covered by tests
}

// WaitApply send wait apply to all tikv ensure all region peer apply log into the last
func (recovery *Recovery) WaitApply(ctx context.Context) (err error) {
eg, ectx := errgroup.WithContext(ctx)
Expand Down
29 changes: 29 additions & 0 deletions br/pkg/utils/storewatch/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "storewatch",
srcs = ["watching.go"],
importpath = "github.com/pingcap/tidb/br/pkg/utils/storewatch",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/conn",
"//br/pkg/conn/util",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/metapb",
],
)

go_test(
name = "storewatch_test",
timeout = "short",
srcs = ["watching_test.go"],
flaky = True,
shard_count = 3,
deps = [
":storewatch",
"//br/pkg/conn/util",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_stretchr_testify//require",
"@com_github_tikv_pd_client//:client",
],
)
136 changes: 136 additions & 0 deletions br/pkg/utils/storewatch/watching.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// package storewatch provides a `Watcher` type which allows
// the user to listen the events of lifetime of stores.
package storewatch

import (
"context"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/br/pkg/conn"
"github.com/pingcap/tidb/br/pkg/conn/util"
)

// Callback will be called the supported event triggered.
type Callback interface {
OnNewStoreRegistered(store *metapb.Store)
OnDisconnect(store *metapb.Store)
OnReboot(store *metapb.Store)
}

// DynCallback is a function based callback set.
type DynCallback struct {
onNewStoreRegistered func(*metapb.Store)
onDisconnect func(*metapb.Store)
onReboot func(*metapb.Store)
}

// OnNewStoreRegistered will be called once new region added to be watched.
func (cb *DynCallback) OnNewStoreRegistered(store *metapb.Store) {
if cb.onNewStoreRegistered != nil {
cb.onNewStoreRegistered(store)
}
}

// OnDisconnect will be called once the store is disconnected.
func (cb *DynCallback) OnDisconnect(store *metapb.Store) {
if cb.onDisconnect != nil {
cb.onDisconnect(store)
}
}

// OnReboot will be called once the store is rebooted.
func (cb *DynCallback) OnReboot(store *metapb.Store) {
if cb.onReboot != nil {
cb.onReboot(store)
}
}

// DynCallbackOpt is the option for DynCallback.
type DynCallbackOpt func(*DynCallback)

// WithOnNewStoreRegistered adds a hook to the callback.
func WithOnNewStoreRegistered(f func(*metapb.Store)) DynCallbackOpt {
return func(cb *DynCallback) {
cb.onNewStoreRegistered = f
}
}

// WithOnDisconnect adds a hook to the callback.
func WithOnDisconnect(f func(*metapb.Store)) DynCallbackOpt {
return func(cb *DynCallback) {
cb.onDisconnect = f
}
}

// WithOnReboot adds a hook to the callback.
func WithOnReboot(f func(*metapb.Store)) DynCallbackOpt {
return func(cb *DynCallback) {
cb.onReboot = f
}
}

// MakeCallback creates a callback with the given options.
// Allowed options: WithOnNewStoreRegistered, WithOnDisconnect, WithOnReboot.
func MakeCallback(opts ...DynCallbackOpt) Callback {
cb := &DynCallback{}
for _, opt := range opts {
opt(cb)
}
return cb
}

// Watcher watches the lifetime of stores.
// generally it should be advanced by calling the `Step` call.
type Watcher struct {
cli util.StoreMeta
cb Callback

lastStores map[uint64]*metapb.Store
}

func New(cli util.StoreMeta, cb Callback) *Watcher {
return &Watcher{
cli: cli,
cb: cb,
lastStores: make(map[uint64]*metapb.Store),
}
}

func (w *Watcher) Step(ctx context.Context) error {
liveStores, err := conn.GetAllTiKVStoresWithRetry(ctx, w.cli, util.SkipTiFlash)
if err != nil {
return errors.Annotate(err, "failed to update store list")
}

Check warning on line 104 in br/pkg/utils/storewatch/watching.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/utils/storewatch/watching.go#L103-L104

Added lines #L103 - L104 were not covered by tests
recorded := map[uint64]struct{}{}
for _, store := range liveStores {
w.updateStore(store)
recorded[store.GetId()] = struct{}{}
}
w.retain(recorded)
return nil
}

// updateStore updates the current store. and call the hooks needed.
func (w *Watcher) updateStore(newStore *metapb.Store) {
lastStore, ok := w.lastStores[newStore.GetId()]
w.lastStores[newStore.GetId()] = newStore
if !ok {
w.cb.OnNewStoreRegistered(newStore)
return
}
if lastStore.GetState() == metapb.StoreState_Up && newStore.GetState() == metapb.StoreState_Offline {
w.cb.OnDisconnect(newStore)
}
if lastStore.StartTimestamp != newStore.StartTimestamp {
w.cb.OnReboot(newStore)
}
}

func (w *Watcher) retain(storeSet map[uint64]struct{}) {
for id := range w.lastStores {
if _, ok := storeSet[id]; !ok {
delete(w.lastStores, id)
}

Check warning on line 134 in br/pkg/utils/storewatch/watching.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/utils/storewatch/watching.go#L133-L134

Added lines #L133 - L134 were not covered by tests
}
}
Loading
Loading