Skip to content

Commit

Permalink
*: extract storage package, and update kvproto (pingcap#88)
Browse files Browse the repository at this point in the history
* *: extract storage package, and update kvproto

* go.mod: point to the actual master version

* storage: changed BackendOptions to a unified structure

* go.mod,stroage: update kvproto, remove incompatible storage option check

* cmd: fix go fmt and lint

* storage: recognize s3.region to fix lint
  • Loading branch information
kennytm authored and overvenus committed Dec 11, 2019
1 parent 89c1996 commit a7778a5
Show file tree
Hide file tree
Showing 20 changed files with 421 additions and 187 deletions.
19 changes: 7 additions & 12 deletions cmd/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/spf13/cobra"

"github.com/pingcap/br/pkg/backup"
"github.com/pingcap/br/pkg/storage"
"github.com/pingcap/br/pkg/utils"
)

Expand Down Expand Up @@ -74,13 +75,10 @@ func newFullBackupCommand() *cobra.Command {
if err != nil {
return nil
}
u, err := command.Flags().GetString(FlagStorage)
u, err := storage.ParseBackendFromFlags(command.Flags(), FlagStorage)
if err != nil {
return err
}
if u == "" {
return errors.New("empty backup store is not allowed")
}

err = client.SetStorage(u)
if err != nil {
Expand Down Expand Up @@ -136,7 +134,7 @@ func newFullBackupCommand() *cobra.Command {
updateCh := utils.StartProgress(
ctx, "Full Backup", int64(approximateRegions), !HasLogFile())
err = client.BackupRanges(
ctx, ranges, u, backupTS, rate, concurrency, updateCh)
ctx, ranges, backupTS, rate, concurrency, updateCh)
if err != nil {
return err
}
Expand Down Expand Up @@ -173,7 +171,7 @@ func newFullBackupCommand() *cobra.Command {
// Checksum has finished
close(updateCh)

return client.SaveBackupMeta(u)
return client.SaveBackupMeta()
},
}
return command
Expand All @@ -198,13 +196,10 @@ func newTableBackupCommand() *cobra.Command {
if err != nil {
return err
}
u, err := command.Flags().GetString(FlagStorage)
u, err := storage.ParseBackendFromFlags(command.Flags(), FlagStorage)
if err != nil {
return err
}
if u == "" {
return errors.New("empty backup store is not allowed")
}

err = client.SetStorage(u)
if err != nil {
Expand Down Expand Up @@ -278,7 +273,7 @@ func newTableBackupCommand() *cobra.Command {
updateCh := utils.StartProgress(
ctx, "Table Backup", int64(approximateRegions), !HasLogFile())
err = client.BackupRanges(
ctx, ranges, u, backupTS, rate, concurrency, updateCh)
ctx, ranges, backupTS, rate, concurrency, updateCh)
if err != nil {
return err
}
Expand Down Expand Up @@ -311,7 +306,7 @@ func newTableBackupCommand() *cobra.Command {
// Checksum has finished
close(updateCh)

return client.SaveBackupMeta(u)
return client.SaveBackupMeta()
},
}
command.Flags().StringP("db", "", "", "backup a table in the specific db")
Expand Down
2 changes: 2 additions & 0 deletions cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"go.uber.org/zap"

"github.com/pingcap/br/pkg/conn"
"github.com/pingcap/br/pkg/storage"
)

var (
Expand Down Expand Up @@ -65,6 +66,7 @@ func AddFlags(cmd *cobra.Command) {
"Set the log file path. If not set, logs will output to stdout")
cmd.PersistentFlags().String(FlagStatusAddr, "",
"Set the HTTP listening address for the status report service. Set to empty string to disable")
storage.DefineFlags(cmd.PersistentFlags())

cmd.PersistentFlags().StringP(FlagSlowLogFile, "", "",
"Set the slow log file path. If not set, discard slow logs")
Expand Down
10 changes: 4 additions & 6 deletions cmd/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/spf13/cobra"
"go.uber.org/zap"

pkgstorage "github.com/pingcap/br/pkg/storage"
"github.com/pingcap/br/pkg/utils"
)

Expand All @@ -35,14 +36,11 @@ func NewMetaCommand() *cobra.Command {
Use: "checksum",
Short: "check the backup data",
RunE: func(cmd *cobra.Command, _ []string) error {
u, err := cmd.Flags().GetString("storage")
u, err := pkgstorage.ParseBackendFromFlags(cmd.Flags(), FlagStorage)
if err != nil {
return errors.Trace(err)
}
if u == "" {
return errors.New("empty backup store is not allowed")
return err
}
storage, err := utils.CreateStorage(u)
storage, err := pkgstorage.Create(u)
if err != nil {
return errors.Trace(err)
}
Expand Down
5 changes: 3 additions & 2 deletions cmd/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
flag "github.com/spf13/pflag"

"github.com/pingcap/br/pkg/restore"
"github.com/pingcap/br/pkg/storage"
"github.com/pingcap/br/pkg/utils"
)

Expand Down Expand Up @@ -382,11 +383,11 @@ func newTableRestoreCommand() *cobra.Command {
}

func initRestoreClient(client *restore.Client, flagSet *flag.FlagSet) error {
u, err := flagSet.GetString(FlagStorage)
u, err := storage.ParseBackendFromFlags(flagSet, FlagStorage)
if err != nil {
return err
}
s, err := utils.CreateStorage(u)
s, err := storage.Create(u)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/onsi/gomega v1.7.1 // indirect
github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8
github.com/pingcap/errors v0.11.4
github.com/pingcap/kvproto v0.0.0-20191113105027-4f292e1801d8
github.com/pingcap/kvproto v0.0.0-20191210040729-c23886becb54
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd
github.com/pingcap/parser v0.0.0-20191205054626-288fe5207ce6
github.com/pingcap/pd v1.1.0-beta.0.20191115131715-6b7dc037010e
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17Xtb
github.com/pingcap/kvproto v0.0.0-20191018025622-fbf07f9804da/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20191113105027-4f292e1801d8 h1:P9jGgwVkLHlbEGtgGKrY0k/yy6N8L8Gdj8dliFswllU=
github.com/pingcap/kvproto v0.0.0-20191113105027-4f292e1801d8/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20191210040729-c23886becb54 h1:T8myp+i7bPLy/W4rEjtsAZgjGTqQ0rnLu9xQ4YAfXJU=
github.com/pingcap/kvproto v0.0.0-20191210040729-c23886becb54/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd h1:hWDol43WY5PGhsh3+8794bFHY1bPrmu6bTalpssCrGg=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
github.com/pingcap/parser v0.0.0-20191205054626-288fe5207ce6 h1:KrJorS9gGYMhsQjENNWAeB5ho28xbowZ74pfJWkOmFc=
Expand Down
57 changes: 28 additions & 29 deletions pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tidb/util/ranger"
"go.uber.org/zap"

"github.com/pingcap/br/pkg/storage"
"github.com/pingcap/br/pkg/utils"
)

Expand All @@ -48,7 +49,8 @@ type Client struct {
clusterID uint64

backupMeta backup.BackupMeta
storage utils.ExternalStorage
storage storage.ExternalStorage
backend *backup.StorageBackend
}

// NewBackupClient returns a new backup client
Expand Down Expand Up @@ -98,29 +100,30 @@ func (bc *Client) GetTS(ctx context.Context, timeAgo string) (uint64, error) {
}

// SetStorage set ExternalStorage for client
func (bc *Client) SetStorage(base string) error {
func (bc *Client) SetStorage(backend *backup.StorageBackend) error {
var err error
bc.storage, err = utils.CreateStorage(base)
bc.storage, err = storage.Create(backend)
if err != nil {
return err
}
// backupmeta already exists
if exist := bc.storage.FileExists(utils.MetaFile); exist {
return errors.New("backup meta exists, may be some backup files in the path already")
}
bc.backend = backend
return nil
}

// SaveBackupMeta saves the current backup meta at the given path.
func (bc *Client) SaveBackupMeta(path string) error {
bc.backupMeta.Path = path
func (bc *Client) SaveBackupMeta() error {
backupMetaData, err := proto.Marshal(&bc.backupMeta)
if err != nil {
return errors.Trace(err)
}
log.Debug("backup meta",
zap.Reflect("meta", bc.backupMeta))
log.Info("save backup meta", zap.String("path", path))
backendURL := storage.FormatBackendURL(bc.backend)
log.Info("save backup meta", zap.Stringer("path", &backendURL))
return bc.storage.Write(utils.MetaFile, backupMetaData)
}

Expand Down Expand Up @@ -265,7 +268,6 @@ LoadDb:
func (bc *Client) BackupRanges(
ctx context.Context,
ranges []Range,
path string,
backupTS uint64,
rate uint64,
concurrency uint32,
Expand All @@ -283,7 +285,7 @@ func (bc *Client) BackupRanges(
go func() {
for _, r := range ranges {
err := bc.backupRange(
ctx, r.StartKey, r.EndKey, path, backupTS, rate, concurrency, updateCh)
ctx, r.StartKey, r.EndKey, backupTS, rate, concurrency, updateCh)
if err != nil {
errCh <- err
return
Expand Down Expand Up @@ -326,7 +328,6 @@ func (bc *Client) BackupRanges(
func (bc *Client) backupRange(
ctx context.Context,
startKey, endKey []byte,
path string,
backupTS uint64,
rateMBs uint64,
concurrency uint32,
Expand All @@ -348,14 +349,14 @@ func (bc *Client) backupRange(
return errors.Trace(err)
}
req := backup.BackupRequest{
ClusterId: bc.clusterID,
StartKey: startKey,
EndKey: endKey,
StartVersion: backupTS,
EndVersion: backupTS,
Path: path,
RateLimit: rateLimit,
Concurrency: concurrency,
ClusterId: bc.clusterID,
StartKey: startKey,
EndKey: endKey,
StartVersion: backupTS,
EndVersion: backupTS,
StorageBackend: bc.backend,
RateLimit: rateLimit,
Concurrency: concurrency,
}
push := newPushDown(ctx, bc.mgr, len(allStores))

Expand All @@ -369,7 +370,7 @@ func (bc *Client) backupRange(
// TODO: test fine grained backup.
err = bc.fineGrainedBackup(
ctx, startKey, endKey,
backupTS, path, rateLimit, concurrency, results, updateCh)
backupTS, rateLimit, concurrency, results, updateCh)
if err != nil {
return err
}
Expand Down Expand Up @@ -424,7 +425,6 @@ func (bc *Client) fineGrainedBackup(
ctx context.Context,
startKey, endKey []byte,
backupTS uint64,
path string,
rateLimit uint64,
concurrency uint32,
rangeTree RangeTree,
Expand Down Expand Up @@ -455,7 +455,7 @@ func (bc *Client) fineGrainedBackup(
defer wg.Done()
for rg := range retry {
backoffMs, err :=
bc.handleFineGrained(ctx, boFork, rg, backupTS, path, rateLimit, concurrency, respCh)
bc.handleFineGrained(ctx, boFork, rg, backupTS, rateLimit, concurrency, respCh)
if err != nil {
errCh <- err
return
Expand Down Expand Up @@ -587,7 +587,6 @@ func (bc *Client) handleFineGrained(
bo *tikv.Backoffer,
rg Range,
backupTS uint64,
path string,
rateLimit uint64,
concurrency uint32,
respCh chan<- *backup.BackupResponse,
Expand All @@ -599,14 +598,14 @@ func (bc *Client) handleFineGrained(
storeID := leader.GetStoreId()
max := 0
req := backup.BackupRequest{
ClusterId: bc.clusterID,
StartKey: rg.StartKey, // TODO: the range may cross region.
EndKey: rg.EndKey,
StartVersion: backupTS,
EndVersion: backupTS,
Path: path,
RateLimit: rateLimit,
Concurrency: concurrency,
ClusterId: bc.clusterID,
StartKey: rg.StartKey, // TODO: the range may cross region.
EndKey: rg.EndKey,
StartVersion: backupTS,
EndVersion: backupTS,
StorageBackend: bc.backend,
RateLimit: rateLimit,
Concurrency: concurrency,
}
lockResolver := bc.mgr.GetLockResolver()
client, err := bc.mgr.GetBackupClient(ctx, storeID)
Expand Down
4 changes: 2 additions & 2 deletions pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (rc *Client) Close() {
}

// InitBackupMeta loads schemas from BackupMeta to initialize RestoreClient
func (rc *Client) InitBackupMeta(backupMeta *backup.BackupMeta, storagePath string) error {
func (rc *Client) InitBackupMeta(backupMeta *backup.BackupMeta, backend *backup.StorageBackend) error {
databases, err := utils.LoadBackupTables(backupMeta)
if err != nil {
return errors.Trace(err)
Expand All @@ -95,7 +95,7 @@ func (rc *Client) InitBackupMeta(backupMeta *backup.BackupMeta, storagePath stri

metaClient := restore_util.NewClient(rc.pdClient)
importClient := NewImportClient(metaClient)
rc.fileImporter = NewFileImporter(rc.ctx, metaClient, importClient, storagePath)
rc.fileImporter = NewFileImporter(rc.ctx, metaClient, importClient, backend)
return nil
}

Expand Down
14 changes: 7 additions & 7 deletions pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (ic *importClient) getImportClient(
type FileImporter struct {
metaClient restore_util.Client
importClient ImporterClient
fileURL string
backend *backup.StorageBackend

ctx context.Context
cancel context.CancelFunc
Expand All @@ -125,12 +125,12 @@ func NewFileImporter(
ctx context.Context,
metaClient restore_util.Client,
importClient ImporterClient,
fileURL string,
backend *backup.StorageBackend,
) FileImporter {
ctx, cancel := context.WithCancel(ctx)
return FileImporter{
metaClient: metaClient,
fileURL: fileURL,
backend: backend,
ctx: ctx,
cancel: cancel,
importClient: importClient,
Expand Down Expand Up @@ -236,10 +236,10 @@ func (importer *FileImporter) downloadSST(
sstMeta.RegionId = regionInfo.Region.GetId()
sstMeta.RegionEpoch = regionInfo.Region.GetRegionEpoch()
req := &import_sstpb.DownloadRequest{
Sst: sstMeta,
Url: importer.fileURL,
Name: file.GetName(),
RewriteRule: *regionRule,
Sst: sstMeta,
StorageBackend: importer.backend,
Name: file.GetName(),
RewriteRule: *regionRule,
}
var resp *import_sstpb.DownloadResponse
for _, peer := range regionInfo.Region.GetPeers() {
Expand Down
Loading

0 comments on commit a7778a5

Please sign in to comment.