Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
cherry-pick master to release 4.0 (#250)
Browse files Browse the repository at this point in the history
* enable tidb config by default (#230)

* enable tidb config by default

* backup,restore: fix --checksum flag. (#223)

* backup,restore: work on progress to fix a bug that causes --checksum flag won't work properly.

Some code of backup and restore ignored the flag (a.k.a. config.Checksum), so when checksum is disabled, we will face failure.

* backup: backup will report total bytes and kvs when checksums check disabled.

Some code of backup and restore ignored the flag (a.k.a. config.Checksum), so when checksum is disabled, we will face failure.

* backup: backup will report total bytes and kvs when checksums check disabled.

Some code of backup and restore ignored the flag (a.k.a. config.Checksum), so when checksum is disabled, we will face failure.

* backup: add log to ChecksumMatches and new version of FastChecksum.

Some of log has been lose. They are in ChecksumMatches now.

* restore: restore could find non-checksum tables and skip them automatically.

for backup, ChecksumMatches returns error now.

* misc: add document for Table::NoChecksum.

* backup: omit checksum progress bar when user specify `--checksum=false`.

* backup: `CopyMetaFrom` overrides original `client.Schemes` instead of append at its end.

* backup: refactor about checksum logic, fix a bug.

the bug would cause: when multi tables are backup, the metadata contains only one table.

* backup: do some lints.

* backup,restore: do some refactor so that cyclomatic complexity won't be too large.

* misc: don't use underscore on receiver.

* backup: print "quick checksum success" message per table.

...to make br_full_index happy!

* backup: refactor a MinInt pattern.

* backup: Apply suggestions from code review

Co-Authored-By: kennytm <kennytm@gmail.com>

Co-authored-by: 3pointer <luancheng@pingcap.com>
Co-authored-by: kennytm <kennytm@gmail.com>

* Use table info (#231)

* create with info during incremental restore

* add test

* fix log

* address comment

* fix ci

* address commemnt

* backup: generate backupmeta when backup empty. (#235)

* cmd: don't use ':' in the default log file name (#236)

* Update build status badge (#239)

* pass sse_kms_key_id to S3 (#243)

* redirect kvproto

Signed-off-by: Yi Wu <yiwu@pingcap.com>

* pass sse_kms_key_id

Signed-off-by: Yi Wu <yiwu@pingcap.com>

* fix hound

Signed-off-by: Yi Wu <yiwu@pingcap.com>

* update kvproto

Signed-off-by: Yi Wu <yiwu@pingcap.com>

* go mod tidy

Signed-off-by: Yi Wu <yiwu@pingcap.com>

Co-authored-by: kennytm <kennytm@gmail.com>

* storage: support placing the S3/GCS options into the storage URL (#246)

* update 4.0 dependency

* go mod tidy

* Update README.md

Co-Authored-By: Neil Shen <overvenus@gmail.com>

Co-authored-by: 山岚 <36239017+YuJuncen@users.noreply.github.com>
Co-authored-by: kennytm <kennytm@gmail.com>
Co-authored-by: Neil Shen <overvenus@gmail.com>
Co-authored-by: yiwu-arbug <yiwu@pingcap.com>
  • Loading branch information
5 people authored Apr 26, 2020
1 parent 69444ab commit a7b498e
Show file tree
Hide file tree
Showing 25 changed files with 645 additions and 186 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ backupmeta
coverage.txt
docker/data/
docker/logs/
*.swp
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# BR

[![Build Status](https://internal.pingcap.net/idc-jenkins/job/build_br_master/badge/icon)](https://internal.pingcap.net/idc-jenkins/job/build_br_master/)
[![Build Status](https://internal.pingcap.net/idc-jenkins/job/build_br_multi_branch/job/release-4.0/badge/icon)](https://internal.pingcap.net/idc-jenkins/job/build_br_multi_branch/job/release-4.0/)
[![codecov](https://codecov.io/gh/pingcap/br/branch/master/graph/badge.svg)](https://codecov.io/gh/pingcap/br)
[![LICENSE](https://img.shields.io/github/license/pingcap/br.svg)](https://github.com/pingcap/br/blob/master/LICENSE)
[![Language](https://img.shields.io/badge/Language-Go-blue.svg)](https://golang.org/)
Expand Down
2 changes: 1 addition & 1 deletion cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ const (
)

func timestampLogFileName() string {
return filepath.Join(os.TempDir(), "br.log."+time.Now().Format(time.RFC3339))
return filepath.Join(os.TempDir(), time.Now().Format("br.log.2006-01-02T15.04.05Z0700"))
}

// AddFlags adds flags to the given cmd.
Expand Down
12 changes: 5 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,24 @@ require (
github.com/cheggaaa/pb/v3 v3.0.1
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect
github.com/fatih/color v1.9.0 // indirect
github.com/fsouza/fake-gcs-server v1.15.0
github.com/go-sql-driver/mysql v1.4.1
github.com/gogo/protobuf v1.3.1
github.com/google/btree v1.0.0
github.com/google/uuid v1.1.1
github.com/klauspost/cpuid v1.2.0 // indirect
github.com/mattn/go-runewidth v0.0.7 // indirect
github.com/montanaflynn/stats v0.5.0 // indirect
github.com/onsi/ginkgo v1.11.0 // indirect
github.com/onsi/gomega v1.8.1 // indirect
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712
github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011
github.com/pingcap/kvproto v0.0.0-20200330093347-98f910b71904
github.com/pingcap/kvproto v0.0.0-20200424032552-6650270c39c3
github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd
github.com/pingcap/parser v0.0.0-20200326020624-68d423641be5
github.com/pingcap/pd/v4 v4.0.0-beta.1.0.20200305072537-61d9f9cc35d3
github.com/pingcap/tidb v0.0.0-20200401141416-959eca8f3a39
github.com/pingcap/parser v0.0.0-20200425031156-fb338edcaac2
github.com/pingcap/pd/v4 v4.0.0-rc.1.0.20200422143320-428acd53eba2
github.com/pingcap/tidb v1.1.0-beta.0.20200424160056-7267747ae0ec
github.com/pingcap/tidb-tools v4.0.0-rc.1.0.20200421113014-507d2bb3a15e+incompatible
github.com/pingcap/tipb v0.0.0-20200212061130-c4d518eb1d60
github.com/pingcap/tipb v0.0.0-20200417094153-7316d94df1ee
github.com/prometheus/client_golang v1.0.0
github.com/prometheus/common v0.4.1
github.com/sirupsen/logrus v1.4.2
Expand Down
74 changes: 47 additions & 27 deletions go.sum

Large diffs are not rendered by default.

103 changes: 82 additions & 21 deletions pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ type ClientMgr interface {
Close()
}

// Checksum is the checksum of some backup files calculated by CollectChecksums.
type Checksum struct {
Crc64Xor uint64
TotalKvs uint64
TotalBytes uint64
}

// Maximum total sleep time(in ms) for kv/cop commands.
const (
backupFineGrainedMaxBackoff = 80000
Expand Down Expand Up @@ -748,8 +755,59 @@ func SendBackup(
return nil
}

// FastChecksum check data integrity by xor all(sst_checksum) per table
func (bc *Client) FastChecksum() (bool, error) {
// ChecksumMatches tests whether the "local" checksum matches the checksum from TiKV.
func (bc *Client) ChecksumMatches(local []Checksum) (bool, error) {
if len(local) != len(bc.backupMeta.Schemas) {
return false, nil
}

for i, schema := range bc.backupMeta.Schemas {
localChecksum := local[i]
dbInfo := &model.DBInfo{}
err := json.Unmarshal(schema.Db, dbInfo)
if err != nil {
log.Error("failed in fast checksum, and cannot parse db info.")
return false, err
}
tblInfo := &model.TableInfo{}
err = json.Unmarshal(schema.Table, tblInfo)
if err != nil {
log.Error("failed in fast checksum, and cannot parse table info.")
return false, err
}
if localChecksum.Crc64Xor != schema.Crc64Xor ||
localChecksum.TotalBytes != schema.TotalBytes ||
localChecksum.TotalKvs != schema.TotalKvs {
log.Error("failed in fast checksum",
zap.Stringer("db", dbInfo.Name),
zap.Stringer("table", tblInfo.Name),
zap.Uint64("origin tidb crc64", schema.Crc64Xor),
zap.Uint64("calculated crc64", localChecksum.Crc64Xor),
zap.Uint64("origin tidb total kvs", schema.TotalKvs),
zap.Uint64("calculated total kvs", localChecksum.TotalKvs),
zap.Uint64("origin tidb total bytes", schema.TotalBytes),
zap.Uint64("calculated total bytes", localChecksum.TotalBytes),
)
return false, nil
}
log.Info("fast checksum success",
zap.String("database", dbInfo.Name.L),
zap.String("table", tblInfo.Name.L))
}
return true, nil
}

// CollectFileInfo collects ungrouped file summary information, like kv count and size.
func (bc *Client) CollectFileInfo() {
for _, file := range bc.backupMeta.Files {
summary.CollectSuccessUnit(summary.TotalKV, 1, file.TotalKvs)
summary.CollectSuccessUnit(summary.TotalBytes, 1, file.TotalBytes)
}
}

// CollectChecksums check data integrity by xor all(sst_checksum) per table
// it returns the checksum of all local files.
func (bc *Client) CollectChecksums() ([]Checksum, error) {
start := time.Now()
defer func() {
elapsed := time.Since(start)
Expand All @@ -758,19 +816,20 @@ func (bc *Client) FastChecksum() (bool, error) {

dbs, err := utils.LoadBackupTables(&bc.backupMeta)
if err != nil {
return false, err
return nil, err
}

checksums := make([]Checksum, 0, len(bc.backupMeta.Schemas))
for _, schema := range bc.backupMeta.Schemas {
dbInfo := &model.DBInfo{}
err = json.Unmarshal(schema.Db, dbInfo)
if err != nil {
return false, err
return nil, err
}
tblInfo := &model.TableInfo{}
err = json.Unmarshal(schema.Table, tblInfo)
if err != nil {
return false, err
return nil, err
}
tbl := dbs[dbInfo.Name.String()].GetTable(tblInfo.Name.String())

Expand All @@ -785,25 +844,16 @@ func (bc *Client) FastChecksum() (bool, error) {

summary.CollectSuccessUnit(summary.TotalKV, 1, totalKvs)
summary.CollectSuccessUnit(summary.TotalBytes, 1, totalBytes)

if schema.Crc64Xor == checksum && schema.TotalKvs == totalKvs && schema.TotalBytes == totalBytes {
log.Info("fast checksum success", zap.Stringer("db", dbInfo.Name), zap.Stringer("table", tblInfo.Name))
} else {
log.Error("failed in fast checksum",
zap.String("database", dbInfo.Name.String()),
zap.String("table", tblInfo.Name.String()),
zap.Uint64("origin tidb crc64", schema.Crc64Xor),
zap.Uint64("calculated crc64", checksum),
zap.Uint64("origin tidb total kvs", schema.TotalKvs),
zap.Uint64("calculated total kvs", totalKvs),
zap.Uint64("origin tidb total bytes", schema.TotalBytes),
zap.Uint64("calculated total bytes", totalBytes),
)
return false, nil
log.Info("fast checksum calculated", zap.Stringer("db", dbInfo.Name), zap.Stringer("table", tblInfo.Name))
localChecksum := Checksum{
Crc64Xor: checksum,
TotalKvs: totalKvs,
TotalBytes: totalBytes,
}
checksums = append(checksums, localChecksum)
}

return true, nil
return checksums, nil
}

// CompleteMeta wait response of admin checksum from TiDB to complete backup meta
Expand All @@ -815,3 +865,14 @@ func (bc *Client) CompleteMeta(backupSchemas *Schemas) error {
bc.backupMeta.Schemas = schemas
return nil
}

// CopyMetaFrom copies schema metadata directly from pending backupSchemas, without calculating checksum.
// use this when user skip the checksum generating.
func (bc *Client) CopyMetaFrom(backupSchemas *Schemas) {
schemas := make([]*kvproto.Schema, 0, len(backupSchemas.schemas))
for _, v := range backupSchemas.schemas {
schema := v
schemas = append(schemas, &schema)
}
bc.backupMeta.Schemas = schemas
}
12 changes: 0 additions & 12 deletions pkg/backup/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ type Schemas struct {
backupSchemaCh chan backup.Schema
errCh chan error
wg *sync.WaitGroup
skipChecksum bool
}

func newBackupSchemas() *Schemas {
Expand All @@ -57,11 +56,6 @@ func (pending *Schemas) pushPending(
pending.schemas[name] = schema
}

// SetSkipChecksum sets whether it should skip checksum
func (pending *Schemas) SetSkipChecksum(skip bool) {
pending.skipChecksum = skip
}

// Start backups schemas
func (pending *Schemas) Start(
ctx context.Context,
Expand All @@ -81,12 +75,6 @@ func (pending *Schemas) Start(
workerPool.Apply(func() {
defer pending.wg.Done()

if pending.skipChecksum {
pending.backupSchemaCh <- schema
updateCh.Inc()
return
}

start := time.Now()
table := model.TableInfo{}
err := json.Unmarshal(schema.Table, &table)
Expand Down
8 changes: 8 additions & 0 deletions pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,14 @@ func (rc *Client) ValidateChecksum(
workers.Apply(func() {
defer wg.Done()

if table.NoChecksum() {
log.Info("table doesn't have checksum, skipping checksum",
zap.Stringer("db", table.Db.Name),
zap.Stringer("table", table.Info.Name))
updateCh.Inc()
return
}

startTS, err := rc.GetTS(ctx)
if err != nil {
errCh <- errors.Trace(err)
Expand Down
22 changes: 21 additions & 1 deletion pkg/restore/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,27 @@ func NewDB(g glue.Glue, store kv.Storage) (*DB, error) {
// ExecDDL executes the query of a ddl job.
func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error {
var err error
if ddlJob.BinlogInfo.TableInfo != nil {
tableInfo := ddlJob.BinlogInfo.TableInfo
dbInfo := ddlJob.BinlogInfo.DBInfo
switch ddlJob.Type {
case model.ActionCreateSchema:
err = db.se.CreateDatabase(ctx, dbInfo)
if err != nil {
log.Error("create database failed", zap.Stringer("db", dbInfo.Name), zap.Error(err))
}
return errors.Trace(err)
case model.ActionCreateTable:
err = db.se.CreateTable(ctx, model.NewCIStr(ddlJob.SchemaName), tableInfo)
if err != nil {
log.Error("create table failed",
zap.Stringer("db", dbInfo.Name),
zap.Stringer("table", tableInfo.Name),
zap.Error(err))
}
return errors.Trace(err)
}

if tableInfo != nil {
switchDbSQL := fmt.Sprintf("use %s;", utils.EncloseName(ddlJob.SchemaName))
err = db.se.Execute(ctx, switchDbSQL)
if err != nil {
Expand Down
75 changes: 66 additions & 9 deletions pkg/storage/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ package storage

import (
"net/url"
"reflect"
"strconv"
"strings"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -46,19 +48,23 @@ func ParseBackend(rawURL string, options *BackendOptions) (*backup.StorageBacken
}
prefix := strings.Trim(u.Path, "/")
s3 := &backup.S3{Bucket: u.Host, Prefix: prefix}
if options != nil {
if err := options.S3.apply(s3); err != nil {
return nil, err
}
if options == nil {
options = &BackendOptions{}
}
ExtractQueryParameters(u, &options.S3)
if err := options.S3.apply(s3); err != nil {
return nil, err
}
return &backup.StorageBackend{Backend: &backup.StorageBackend_S3{S3: s3}}, nil

case "gcs":
case "gs", "gcs":
gcs := &backup.GCS{Bucket: u.Host, Prefix: u.Path[1:]}
if options != nil {
if err := options.GCS.apply(gcs); err != nil {
return nil, err
}
if options == nil {
options = &BackendOptions{}
}
ExtractQueryParameters(u, &options.GCS)
if err := options.GCS.apply(gcs); err != nil {
return nil, err
}
return &backup.StorageBackend{Backend: &backup.StorageBackend_Gcs{Gcs: gcs}}, nil

Expand All @@ -67,6 +73,57 @@ func ParseBackend(rawURL string, options *BackendOptions) (*backup.StorageBacken
}
}

// ExtractQueryParameters moves the query parameters of the URL into the options
// using reflection.
//
// The options must be a pointer to a struct which contains only string or bool
// fields (more types will be supported in the future), and tagged for JSON
// serialization.
//
// All of the URL's query parameters will be removed after calling this method.
func ExtractQueryParameters(u *url.URL, options interface{}) {
type field struct {
index int
kind reflect.Kind
}

// First, find all JSON fields in the options struct type.
o := reflect.Indirect(reflect.ValueOf(options))
ty := o.Type()
numFields := ty.NumField()
tagToField := make(map[string]field, numFields)
for i := 0; i < numFields; i++ {
f := ty.Field(i)
tag := f.Tag.Get("json")
tagToField[tag] = field{index: i, kind: f.Type.Kind()}
}

// Then, read content from the URL into the options.
for key, params := range u.Query() {
if len(params) == 0 {
continue
}
param := params[0]
normalizedKey := strings.ToLower(strings.ReplaceAll(key, "_", "-"))
if f, ok := tagToField[normalizedKey]; ok {
field := o.Field(f.index)
switch f.kind {
case reflect.Bool:
if v, e := strconv.ParseBool(param); e == nil {
field.SetBool(v)
}
case reflect.String:
field.SetString(param)
default:
panic("BackendOption introduced an unsupported kind, please handle it! " + f.kind.String())
}
}
}

// Clean up the URL finally.
u.RawQuery = ""
}

// FormatBackendURL obtains the raw URL which can be used the reconstruct the
// backend. The returned URL does not contain options for further configurating
// the backend. This is to avoid exposing secret tokens.
Expand Down
Loading

0 comments on commit a7b498e

Please sign in to comment.