Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
backup: add raw backup command (#101)
Browse files Browse the repository at this point in the history
* backup: add raw backup command
  • Loading branch information
3pointer authored Mar 4, 2020
1 parent e462f80 commit 4657932
Show file tree
Hide file tree
Showing 12 changed files with 711 additions and 52 deletions.
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ build_for_integration_test:
GO111MODULE=on go build -race -o bin/locker tests/br_key_locked/*.go
# build gc
GO111MODULE=on go build -race -o bin/gc tests/br_z_gc_safepoint/*.go
# build rawkv client
GO111MODULE=on go build -race -o bin/rawkv tests/br_rawkv/*.go

test:
GO111MODULE=on go test -race -tags leak ./...
Expand Down
23 changes: 23 additions & 0 deletions cmd/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@ func runBackupCommand(command *cobra.Command, cmdName string) error {
return task.RunBackup(GetDefaultContext(), tidbGlue, cmdName, &cfg)
}

func runBackupRawCommand(command *cobra.Command, cmdName string) error {
cfg := task.BackupRawConfig{Config: task.Config{LogProgress: HasLogFile()}}
if err := cfg.ParseFromFlags(command.Flags()); err != nil {
return err
}
return task.RunBackupRaw(GetDefaultContext(), tidbGlue, cmdName, &cfg)
}

// NewBackupCommand return a full backup subcommand.
func NewBackupCommand() *cobra.Command {
command := &cobra.Command{
Expand All @@ -43,6 +51,7 @@ func NewBackupCommand() *cobra.Command {
newFullBackupCommand(),
newDbBackupCommand(),
newTableBackupCommand(),
newRawBackupCommand(),
)

task.DefineBackupFlags(command.PersistentFlags())
Expand Down Expand Up @@ -87,3 +96,17 @@ func newTableBackupCommand() *cobra.Command {
task.DefineTableFlags(command)
return command
}

// newRawBackupCommand return a raw kv range backup subcommand.
func newRawBackupCommand() *cobra.Command {
command := &cobra.Command{
Use: "raw",
Short: "backup a raw kv range from TiKV cluster",
RunE: func(command *cobra.Command, _ []string) error {
return runBackupRawCommand(command, "Raw backup")
},
}

task.DefineRawBackupFlags(command)
return command
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ require (
github.com/pingcap/tidb-tools v4.0.0-beta+incompatible
github.com/pingcap/tipb v0.0.0-20200212061130-c4d518eb1d60
github.com/prometheus/client_golang v1.0.0
github.com/prometheus/common v0.4.1
github.com/sirupsen/logrus v1.4.2
github.com/spf13/cobra v0.0.5
github.com/spf13/pflag v1.0.5
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d h1:G0m3OIz70MZUW
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/VividCortex/ewma v1.1.1 h1:MnEK4VOv6n0RSY4vtRe3h11qjxL3+t0B8yOL8iMXdcM=
github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc h1:cAKDfWh5VpdgMhJosfJnn5/FoN2SRZ4p7fJNX58YPaU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf h1:qet1QNfXsQxTZqLG4oE62mJzwPIB8+Tee4RNCL9ulrY=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/antihax/optional v0.0.0-20180407024304-ca021399b1a6/go.mod h1:V8iCPQYkqmusNa815XgQio277wI47sdRh1dUOLdyC6Q=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
Expand Down Expand Up @@ -614,6 +616,7 @@ google.golang.org/grpc v1.24.0/go.mod h1:XDChyiUovWa60DnaeDeZmSW86xtLtjtZbwvSiRn
google.golang.org/grpc v1.25.1 h1:wdKvqQk7IttEw92GoRyKG2IDrUIpgpj6H6m81yfeMW0=
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
gopkg.in/alecthomas/gometalinter.v2 v2.0.12/go.mod h1:NDRytsqEZyolNuAgTzJkZMkSQM7FIKyzVzGhjB/qfYo=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/alecthomas/kingpin.v3-unstable v3.0.0-20180810215634-df19058c872c/go.mod h1:3HH7i1SgMqlzxCcBmUHW657sD4Kvv9sC3HpL3YukzwA=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
99 changes: 49 additions & 50 deletions pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/google/btree"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/backup"
kvproto "github.com/pingcap/kvproto/pkg/backup"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
Expand All @@ -36,7 +36,7 @@ import (

// ClientMgr manages connections needed by backup.
type ClientMgr interface {
GetBackupClient(ctx context.Context, storeID uint64) (backup.BackupClient, error)
GetBackupClient(ctx context.Context, storeID uint64) (kvproto.BackupClient, error)
GetPDClient() pd.Client
GetTiKV() tikv.Storage
GetLockResolver() *tikv.LockResolver
Expand All @@ -53,9 +53,9 @@ type Client struct {
mgr ClientMgr
clusterID uint64

backupMeta backup.BackupMeta
backupMeta kvproto.BackupMeta
storage storage.ExternalStorage
backend *backup.StorageBackend
backend *kvproto.StorageBackend
}

// NewBackupClient returns a new backup client
Expand Down Expand Up @@ -101,7 +101,7 @@ func (bc *Client) GetTS(ctx context.Context, duration time.Duration) (uint64, er
}

// SetStorage set ExternalStorage for client
func (bc *Client) SetStorage(ctx context.Context, backend *backup.StorageBackend, sendCreds bool) error {
func (bc *Client) SetStorage(ctx context.Context, backend *kvproto.StorageBackend, sendCreds bool) error {
var err error
bc.storage, err = storage.Create(ctx, backend, sendCreds)
if err != nil {
Expand Down Expand Up @@ -222,7 +222,7 @@ func BuildBackupRangeAndSchema(
return nil, nil, errors.Trace(err)
}

schema := backup.Schema{
schema := kvproto.Schema{
Db: dbData,
Table: tableData,
}
Expand Down Expand Up @@ -296,10 +296,7 @@ func GetBackupDDLJobs(dom *domain.Domain, lastBackupTS, backupTS uint64) ([]*mod
func (bc *Client) BackupRanges(
ctx context.Context,
ranges []Range,
lastBackupTS uint64,
backupTS uint64,
rateLimit uint64,
concurrency uint32,
req kvproto.BackupRequest,
updateCh chan<- struct{},
) error {
start := time.Now()
Expand All @@ -313,8 +310,8 @@ func (bc *Client) BackupRanges(
defer cancel()
go func() {
for _, r := range ranges {
err := bc.backupRange(
ctx, r.StartKey, r.EndKey, lastBackupTS, backupTS, rateLimit, concurrency, updateCh)
err := bc.BackupRange(
ctx, r.StartKey, r.EndKey, req, updateCh)
if err != nil {
errCh <- err
return
Expand All @@ -329,7 +326,7 @@ func (bc *Client) BackupRanges(

finished := false
for {
err := CheckGCSafepoint(ctx, bc.mgr.GetPDClient(), backupTS)
err := CheckGCSafepoint(ctx, bc.mgr.GetPDClient(), req.EndVersion)
if err != nil {
log.Error("check GC safepoint failed", zap.Error(err))
return err
Expand All @@ -353,14 +350,11 @@ func (bc *Client) BackupRanges(
}
}

// backupRange make a backup of the given key range.
func (bc *Client) backupRange(
// BackupRange make a backup of the given key range.
func (bc *Client) BackupRange(
ctx context.Context,
startKey, endKey []byte,
lastBackupTS uint64,
backupTS uint64,
rateLimit uint64,
concurrency uint32,
req kvproto.BackupRequest,
updateCh chan<- struct{},
) (err error) {
start := time.Now()
Expand All @@ -377,8 +371,8 @@ func (bc *Client) backupRange(
log.Info("backup started",
zap.Binary("StartKey", startKey),
zap.Binary("EndKey", endKey),
zap.Uint64("RateLimit", rateLimit),
zap.Uint32("Concurrency", concurrency))
zap.Uint64("RateLimit", req.RateLimit),
zap.Uint32("Concurrency", req.Concurrency))
ctx, cancel := context.WithCancel(ctx)
defer cancel()

Expand All @@ -388,16 +382,11 @@ func (bc *Client) backupRange(
return errors.Trace(err)
}

req := backup.BackupRequest{
ClusterId: bc.clusterID,
StartKey: startKey,
EndKey: endKey,
StartVersion: lastBackupTS,
EndVersion: backupTS,
StorageBackend: bc.backend,
RateLimit: rateLimit,
Concurrency: concurrency,
}
req.ClusterId = bc.clusterID
req.StartKey = startKey
req.EndKey = endKey
req.StorageBackend = bc.backend

push := newPushDown(ctx, bc.mgr, len(allStores))

var results RangeTree
Expand All @@ -410,17 +399,27 @@ func (bc *Client) backupRange(
// Find and backup remaining ranges.
// TODO: test fine grained backup.
err = bc.fineGrainedBackup(
ctx, startKey, endKey, lastBackupTS,
backupTS, rateLimit, concurrency, results, updateCh)
ctx, startKey, endKey, req.StartVersion,
req.EndVersion, req.RateLimit, req.Concurrency, results, updateCh)
if err != nil {
return err
}

bc.backupMeta.StartVersion = lastBackupTS
bc.backupMeta.EndVersion = backupTS
log.Info("backup time range",
zap.Reflect("StartVersion", lastBackupTS),
zap.Reflect("EndVersion", backupTS))
bc.backupMeta.StartVersion = req.StartVersion
bc.backupMeta.EndVersion = req.EndVersion
bc.backupMeta.IsRawKv = req.IsRawKv
if req.IsRawKv {
bc.backupMeta.RawRanges = append(bc.backupMeta.RawRanges,
&kvproto.RawRange{StartKey: startKey, EndKey: endKey, Cf: req.Cf})
log.Info("backup raw ranges",
zap.ByteString("startKey", startKey),
zap.ByteString("endKey", endKey),
zap.String("cf", req.Cf))
} else {
log.Info("backup time range",
zap.Reflect("StartVersion", req.StartVersion),
zap.Reflect("EndVersion", req.EndVersion))
}

results.tree.Ascend(func(i btree.Item) bool {
r := i.(*Range)
Expand Down Expand Up @@ -479,7 +478,7 @@ func (bc *Client) fineGrainedBackup(
}
log.Info("start fine grained backup", zap.Int("incomplete", len(incomplete)))
// Step2, retry backup on incomplete range
respCh := make(chan *backup.BackupResponse, 4)
respCh := make(chan *kvproto.BackupResponse, 4)
errCh := make(chan error, 4)
retry := make(chan Range, 4)

Expand Down Expand Up @@ -566,15 +565,15 @@ func onBackupResponse(
bo *tikv.Backoffer,
backupTS uint64,
lockResolver *tikv.LockResolver,
resp *backup.BackupResponse,
) (*backup.BackupResponse, int, error) {
resp *kvproto.BackupResponse,
) (*kvproto.BackupResponse, int, error) {
log.Debug("onBackupResponse", zap.Reflect("resp", resp))
if resp.Error == nil {
return resp, 0, nil
}
backoffMs := 0
switch v := resp.Error.Detail.(type) {
case *backup.Error_KvError:
case *kvproto.Error_KvError:
if lockErr := v.KvError.Locked; lockErr != nil {
// Try to resolve lock.
log.Warn("backup occur kv error", zap.Reflect("error", v))
Expand All @@ -592,7 +591,7 @@ func onBackupResponse(
log.Error("unexpect kv error", zap.Reflect("KvError", v.KvError))
return nil, backoffMs, errors.Errorf("onBackupResponse error %v", v)

case *backup.Error_RegionError:
case *kvproto.Error_RegionError:
regionErr := v.RegionError
// Ignore following errors.
if !(regionErr.EpochNotMatch != nil ||
Expand All @@ -610,7 +609,7 @@ func onBackupResponse(
// TODO: a better backoff.
backoffMs = 1000 /* 1s */
return nil, backoffMs, nil
case *backup.Error_ClusterIdError:
case *kvproto.Error_ClusterIdError:
log.Error("backup occur cluster ID error",
zap.Reflect("error", v))
err := errors.Errorf("%v", resp.Error)
Expand All @@ -631,7 +630,7 @@ func (bc *Client) handleFineGrained(
backupTS uint64,
rateLimit uint64,
concurrency uint32,
respCh chan<- *backup.BackupResponse,
respCh chan<- *kvproto.BackupResponse,
) (int, error) {
leader, pderr := bc.findRegionLeader(ctx, rg.StartKey)
if pderr != nil {
Expand All @@ -640,7 +639,7 @@ func (bc *Client) handleFineGrained(
storeID := leader.GetStoreId()
max := 0

req := backup.BackupRequest{
req := kvproto.BackupRequest{
ClusterId: bc.clusterID,
StartKey: rg.StartKey, // TODO: the range may cross region.
EndKey: rg.EndKey,
Expand All @@ -659,7 +658,7 @@ func (bc *Client) handleFineGrained(
err = SendBackup(
ctx, storeID, client, req,
// Handle responses with the same backoffer.
func(resp *backup.BackupResponse) error {
func(resp *kvproto.BackupResponse) error {
response, backoffMs, err1 :=
onBackupResponse(bo, backupTS, lockResolver, resp)
if err1 != nil {
Expand All @@ -684,9 +683,9 @@ func (bc *Client) handleFineGrained(
func SendBackup(
ctx context.Context,
storeID uint64,
client backup.BackupClient,
req backup.BackupRequest,
respFn func(*backup.BackupResponse) error,
client kvproto.BackupClient,
req kvproto.BackupRequest,
respFn func(*kvproto.BackupResponse) error,
) error {
log.Info("try backup", zap.Any("backup request", req))
ctx, cancel := context.WithCancel(ctx)
Expand Down
4 changes: 3 additions & 1 deletion pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,9 @@ func (mgr *Mgr) Close() {

// Gracefully shutdown domain so it does not affect other TiDB DDL.
// Must close domain before closing storage, otherwise it gets stuck forever.
mgr.dom.Close()
if mgr.dom != nil {
mgr.dom.Close()
}

atomic.StoreUint32(&tikv.ShuttingDown, 1)
mgr.storage.Close()
Expand Down
10 changes: 9 additions & 1 deletion pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/pingcap/errors"
kvproto "github.com/pingcap/kvproto/pkg/backup"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb-tools/pkg/filter"
Expand Down Expand Up @@ -131,8 +132,15 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
// Redirect to log if there is no log file to avoid unreadable output.
updateCh := utils.StartProgress(
ctx, cmdName, int64(approximateRegions), !cfg.LogProgress)

req := kvproto.BackupRequest{
StartVersion: cfg.LastBackupTS,
EndVersion: backupTS,
RateLimit: cfg.RateLimit,
Concurrency: cfg.Concurrency,
}
err = client.BackupRanges(
ctx, ranges, cfg.LastBackupTS, backupTS, cfg.RateLimit, cfg.Concurrency, updateCh)
ctx, ranges, req, updateCh)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 4657932

Please sign in to comment.