Skip to content

Commit

Permalink
lightning: specify key range when switch mode (pingcap#45482)
Browse files Browse the repository at this point in the history
  • Loading branch information
GMHDBJD authored Jul 31, 2023
1 parent 697652f commit cede736
Show file tree
Hide file tree
Showing 25 changed files with 288 additions and 108 deletions.
111 changes: 88 additions & 23 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"database/sql"
"encoding/hex"
"io"
"math"
"net"
Expand Down Expand Up @@ -420,33 +421,35 @@ type BackendConfig struct {
MaxOpenFiles int
KeyspaceName string
// the scope when pause PD schedulers.
PausePDSchedulerScope config.PausePDSchedulerScope
ResourceGroupName string
PausePDSchedulerScope config.PausePDSchedulerScope
ResourceGroupName string
RaftKV2SwitchModeDuration time.Duration
}

// NewBackendConfig creates a new BackendConfig.
func NewBackendConfig(cfg *config.Config, maxOpenFiles int, keyspaceName, resourceGroupName string) BackendConfig {
func NewBackendConfig(cfg *config.Config, maxOpenFiles int, keyspaceName, resourceGroupName string, raftKV2SwitchModeDuration time.Duration) BackendConfig {
return BackendConfig{
PDAddr: cfg.TiDB.PdAddr,
LocalStoreDir: cfg.TikvImporter.SortedKVDir,
MaxConnPerStore: cfg.TikvImporter.RangeConcurrency,
ConnCompressType: cfg.TikvImporter.CompressKVPairs,
WorkerConcurrency: cfg.TikvImporter.RangeConcurrency * 2,
KVWriteBatchSize: int64(cfg.TikvImporter.SendKVSize),
RegionSplitBatchSize: cfg.TikvImporter.RegionSplitBatchSize,
RegionSplitConcurrency: cfg.TikvImporter.RegionSplitConcurrency,
CheckpointEnabled: cfg.Checkpoint.Enable,
MemTableSize: int(cfg.TikvImporter.EngineMemCacheSize),
LocalWriterMemCacheSize: int64(cfg.TikvImporter.LocalWriterMemCacheSize),
ShouldCheckTiKV: cfg.App.CheckRequirements,
DupeDetectEnabled: cfg.TikvImporter.DuplicateResolution != config.DupeResAlgNone,
DuplicateDetectOpt: DupDetectOpt{ReportErrOnDup: cfg.TikvImporter.DuplicateResolution == config.DupeResAlgErr},
StoreWriteBWLimit: int(cfg.TikvImporter.StoreWriteBWLimit),
ShouldCheckWriteStall: cfg.Cron.SwitchMode.Duration == 0,
MaxOpenFiles: maxOpenFiles,
KeyspaceName: keyspaceName,
PausePDSchedulerScope: cfg.TikvImporter.PausePDSchedulerScope,
ResourceGroupName: resourceGroupName,
PDAddr: cfg.TiDB.PdAddr,
LocalStoreDir: cfg.TikvImporter.SortedKVDir,
MaxConnPerStore: cfg.TikvImporter.RangeConcurrency,
ConnCompressType: cfg.TikvImporter.CompressKVPairs,
WorkerConcurrency: cfg.TikvImporter.RangeConcurrency * 2,
KVWriteBatchSize: int64(cfg.TikvImporter.SendKVSize),
RegionSplitBatchSize: cfg.TikvImporter.RegionSplitBatchSize,
RegionSplitConcurrency: cfg.TikvImporter.RegionSplitConcurrency,
CheckpointEnabled: cfg.Checkpoint.Enable,
MemTableSize: int(cfg.TikvImporter.EngineMemCacheSize),
LocalWriterMemCacheSize: int64(cfg.TikvImporter.LocalWriterMemCacheSize),
ShouldCheckTiKV: cfg.App.CheckRequirements,
DupeDetectEnabled: cfg.TikvImporter.DuplicateResolution != config.DupeResAlgNone,
DuplicateDetectOpt: DupDetectOpt{ReportErrOnDup: cfg.TikvImporter.DuplicateResolution == config.DupeResAlgErr},
StoreWriteBWLimit: int(cfg.TikvImporter.StoreWriteBWLimit),
ShouldCheckWriteStall: cfg.Cron.SwitchMode.Duration == 0,
MaxOpenFiles: maxOpenFiles,
KeyspaceName: keyspaceName,
PausePDSchedulerScope: cfg.TikvImporter.PausePDSchedulerScope,
ResourceGroupName: resourceGroupName,
RaftKV2SwitchModeDuration: raftKV2SwitchModeDuration,
}
}

Expand Down Expand Up @@ -1454,6 +1457,23 @@ func (local *Backend) ImportEngine(ctx context.Context, engineUUID uuid.UUID, re
}()
}

if len(regionRanges) > 0 && local.BackendConfig.RaftKV2SwitchModeDuration > 0 {
log.FromContext(ctx).Info("switch import mode of ranges",
zap.String("startKey", hex.EncodeToString(regionRanges[0].start)),
zap.String("endKey", hex.EncodeToString(regionRanges[len(regionRanges)-1].end)))
subCtx, cancel := context.WithCancel(ctx)
defer cancel()

done, err := local.SwitchModeByKeyRanges(subCtx, regionRanges)
if err != nil {
return errors.Trace(err)
}
defer func() {
cancel()
<-done
}()
}

log.FromContext(ctx).Info("start import engine", zap.Stringer("uuid", engineUUID),
zap.Int("region ranges", len(regionRanges)), zap.Int64("count", lfLength), zap.Int64("size", lfTotalSize))

Expand Down Expand Up @@ -1695,6 +1715,51 @@ func (local *Backend) LocalWriter(_ context.Context, cfg *backend.LocalWriterCon
return openLocalWriter(cfg, engine, local.tikvCodec, local.LocalWriterMemCacheSize, local.bufferPool.NewBuffer())
}

// SwitchModeByKeyRanges will switch tikv mode for regions in the specific key range for multirocksdb.
// This function will spawn a goroutine to keep switch mode periodically until the context is done.
// The return done channel is used to notify the caller that the background goroutine is exited.
func (local *Backend) SwitchModeByKeyRanges(ctx context.Context, ranges []Range) (<-chan struct{}, error) {
switcher := NewTiKVModeSwitcher(local.tls, local.PDAddr, log.FromContext(ctx).Logger)
done := make(chan struct{})

keyRanges := make([]*sst.Range, 0, len(ranges))
for _, r := range ranges {
startKey := r.start
if len(r.start) > 0 {
startKey = codec.EncodeBytes(nil, r.start)
}
endKey := r.end
if len(r.end) > 0 {
endKey = codec.EncodeBytes(nil, r.end)
}
keyRanges = append(keyRanges, &sst.Range{
Start: startKey,
End: endKey,
})
}

go func() {
defer close(done)
ticker := time.NewTicker(local.BackendConfig.RaftKV2SwitchModeDuration)
defer ticker.Stop()
switcher.ToImportMode(ctx, keyRanges...)
loop:
for {
select {
case <-ctx.Done():
break loop
case <-ticker.C:
switcher.ToImportMode(ctx, keyRanges...)
}
}
// Use a new context to avoid the context is canceled by the caller.
recoverCtx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
switcher.ToNormalMode(recoverCtx, keyRanges...)
}()
return done, nil
}

func openLocalWriter(cfg *backend.LocalWriterConfig, engine *Engine, tikvCodec tikvclient.Codec, cacheSize int64, kvBuffer *membuf.Buffer) (*Writer, error) {
w := &Writer{
engine: engine,
Expand Down
16 changes: 8 additions & 8 deletions br/pkg/lightning/backend/local/tikv_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import (
// TiKVModeSwitcher is used to switch TiKV nodes between Import and Normal mode.
type TiKVModeSwitcher interface {
// ToImportMode switches all TiKV nodes to Import mode.
ToImportMode(ctx context.Context)
ToImportMode(ctx context.Context, ranges ...*sstpb.Range)
// ToNormalMode switches all TiKV nodes to Normal mode.
ToNormalMode(ctx context.Context)
ToNormalMode(ctx context.Context, ranges ...*sstpb.Range)
}

// TiKVModeSwitcher is used to switch TiKV nodes between Import and Normal mode.
Expand All @@ -47,15 +47,15 @@ func NewTiKVModeSwitcher(tls *common.TLS, pdAddr string, logger *zap.Logger) TiK
}
}

func (rc *switcher) ToImportMode(ctx context.Context) {
rc.switchTiKVMode(ctx, sstpb.SwitchMode_Import)
func (rc *switcher) ToImportMode(ctx context.Context, ranges ...*sstpb.Range) {
rc.switchTiKVMode(ctx, sstpb.SwitchMode_Import, ranges...)
}

func (rc *switcher) ToNormalMode(ctx context.Context) {
rc.switchTiKVMode(ctx, sstpb.SwitchMode_Normal)
func (rc *switcher) ToNormalMode(ctx context.Context, ranges ...*sstpb.Range) {
rc.switchTiKVMode(ctx, sstpb.SwitchMode_Normal, ranges...)
}

func (rc *switcher) switchTiKVMode(ctx context.Context, mode sstpb.SwitchMode) {
func (rc *switcher) switchTiKVMode(ctx context.Context, mode sstpb.SwitchMode, ranges ...*sstpb.Range) {
rc.logger.Info("switch tikv mode", zap.Stringer("mode", mode))

// It is fine if we miss some stores which did not switch to Import mode,
Expand All @@ -76,7 +76,7 @@ func (rc *switcher) switchTiKVMode(ctx context.Context, mode sstpb.SwitchMode) {
tls,
minState,
func(c context.Context, store *tikv.Store) error {
return tikv.SwitchMode(c, tls, store.Address, mode)
return tikv.SwitchMode(c, tls, store.Address, mode, ranges...)
},
)
}
25 changes: 25 additions & 0 deletions br/pkg/lightning/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,3 +660,28 @@ func IsFunctionNotExistErr(err error, functionName string) bool {
(strings.Contains(err.Error(), "No database selected") ||
strings.Contains(err.Error(), fmt.Sprintf("%s does not exist", functionName)))
}

// IsRaftKV2 checks whether the raft-kv2 is enabled
func IsRaftKV2(ctx context.Context, db *sql.DB) (bool, error) {
var (
getRaftKvVersionSQL = "show config where type = 'tikv' and name = 'storage.engine'"
raftKv2 = "raft-kv2"
tp, instance, name, value string
)

rows, err := db.QueryContext(ctx, getRaftKvVersionSQL)
if err != nil {
return false, errors.Trace(err)
}
defer rows.Close()

for rows.Next() {
if err = rows.Scan(&tp, &instance, &name, &value); err != nil {
return false, errors.Trace(err)
}
if value == raftKv2 {
return true, nil
}
}
return false, rows.Err()
}
1 change: 1 addition & 0 deletions br/pkg/lightning/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ go_library(
importpath = "github.com/pingcap/tidb/br/pkg/lightning/importer",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/backup",
"//br/pkg/errors",
"//br/pkg/lightning/backend",
"//br/pkg/lightning/backend/encode",
Expand Down
23 changes: 22 additions & 1 deletion br/pkg/lightning/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/backup"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/encode"
Expand Down Expand Up @@ -376,7 +377,15 @@ func NewImportControllerWithPauser(
}
}

backendConfig := local.NewBackendConfig(cfg, maxOpenFiles, p.KeyspaceName, p.ResourceGroupName)
isRaftKV2, err := common.IsRaftKV2(ctx, db)
if err != nil {
log.FromContext(ctx).Warn("check isRaftKV2 failed", zap.Error(err))
}
var raftKV2SwitchModeDuration time.Duration
if isRaftKV2 {
raftKV2SwitchModeDuration = cfg.Cron.SwitchMode.Duration
}
backendConfig := local.NewBackendConfig(cfg, maxOpenFiles, p.KeyspaceName, p.ResourceGroupName, raftKV2SwitchModeDuration)
backendObj, err = local.NewBackend(ctx, tls, backendConfig, regionSizeGetter)
if err != nil {
return nil, common.NormalizeOrWrapErr(common.ErrUnknown, err)
Expand Down Expand Up @@ -1375,6 +1384,18 @@ func (rc *Controller) buildRunPeriodicActionAndCancelFunc(ctx context.Context, s
}
}

func (rc *Controller) buildTablesRanges() []tidbkv.KeyRange {
var keyRanges []tidbkv.KeyRange
for _, dbInfo := range rc.dbInfos {
for _, tableInfo := range dbInfo.Tables {
if ranges, err := backup.BuildTableRanges(tableInfo.Core); err == nil {
keyRanges = append(keyRanges, ranges...)
}
}
}
return keyRanges
}

type checksumManagerKeyType struct{}

var checksumManagerKey checksumManagerKeyType
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -1053,7 +1053,7 @@ func CleanupMetas(ctx context.Context, cfg *config.Config, tableName string) err
}

// SwitchMode switches the mode of the TiKV cluster.
func SwitchMode(ctx context.Context, cfg *config.Config, tls *common.TLS, mode string) error {
func SwitchMode(ctx context.Context, cfg *config.Config, tls *common.TLS, mode string, ranges ...*import_sstpb.Range) error {
var m import_sstpb.SwitchMode
switch mode {
case config.ImportMode:
Expand All @@ -1069,7 +1069,7 @@ func SwitchMode(ctx context.Context, cfg *config.Config, tls *common.TLS, mode s
tls.WithHost(cfg.TiDB.PdAddr),
tikv.StoreStateDisconnected,
func(c context.Context, store *tikv.Store) error {
return tikv.SwitchMode(c, tls, store.Address, m)
return tikv.SwitchMode(c, tls, store.Address, m, ranges...)
},
)
}
Expand Down
11 changes: 9 additions & 2 deletions br/pkg/lightning/tikv/tikv.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,19 @@ func ignoreUnimplementedError(err error, logger log.Logger) error {
}

// SwitchMode changes the TiKV node at the given address to a particular mode.
func SwitchMode(ctx context.Context, tls *common.TLS, tikvAddr string, mode import_sstpb.SwitchMode) error {
func SwitchMode(
ctx context.Context,
tls *common.TLS,
tikvAddr string,
mode import_sstpb.SwitchMode,
ranges ...*import_sstpb.Range,
) error {
task := log.With(zap.Stringer("mode", mode),
zap.String("tikv", tikvAddr)).Begin(zap.DebugLevel, "switch mode")
err := withTiKVConnection(ctx, tls, tikvAddr, func(client import_sstpb.ImportSSTClient) error {
_, err := client.SwitchMode(ctx, &import_sstpb.SwitchModeRequest{
Mode: mode,
Mode: mode,
Ranges: ranges,
})
return ignoreUnimplementedError(err, task.Logger)
})
Expand Down
1 change: 1 addition & 0 deletions br/pkg/mock/mocklocal/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ go_library(
deps = [
"//br/pkg/lightning/backend",
"@com_github_golang_mock//gomock",
"@com_github_pingcap_kvproto//pkg/import_sstpb",
],
)
27 changes: 19 additions & 8 deletions br/pkg/mock/mocklocal/local.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,13 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error {
// Start some background routine to manage TiFlash replica.
d.wg.Run(d.PollTiFlashRoutine)

ingest.InitGlobalLightningEnv()
ctx, err := d.sessPool.Get()
if err != nil {
return err
}
defer d.sessPool.Put(ctx)

ingest.InitGlobalLightningEnv(d.ctx, ctx)

return nil
}
Expand Down
Loading

0 comments on commit cede736

Please sign in to comment.