Skip to content

Commit

Permalink
Incremental BR: support DDL (pingcap#155)
Browse files Browse the repository at this point in the history
* support backup&restore ddl

Signed-off-by: 5kbpers <tangminghua@pingcap.com>

* integration tests

Signed-off-by: 5kbpers <tangminghua@pingcap.com>

* update kvproto

Signed-off-by: 5kbpers <tangminghua@pingcap.com>

* fix integration tests

Signed-off-by: 5kbpers <tangminghua@pingcap.com>

* reduce cyclomatic complexity of `runRestore`

Signed-off-by: 5kbpers <tangminghua@pingcap.com>

* fix test

Signed-off-by: 5kbpers <tangminghua@pingcap.com>

* add unit test

Signed-off-by: 5kbpers <tangminghua@pingcap.com>

* fix tests

Signed-off-by: 5kbpers <tangminghua@pingcap.com>

* disable fast checksum in incremental br

Signed-off-by: 5kbpers <tangminghua@pingcap.com>

* fix no valid key error

Signed-off-by: 5kbpers <tangminghua@pingcap.com>

* address lint

Signed-off-by: 5kbpers <tangminghua@pingcap.com>

* address comments

Signed-off-by: 5kbpers <tangminghua@pingcap.com>
  • Loading branch information
5kbpers authored and 3pointer committed Feb 21, 2020
1 parent 78c162a commit 45aa9c8
Show file tree
Hide file tree
Showing 21 changed files with 521 additions and 96 deletions.
10 changes: 5 additions & 5 deletions cmd/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,19 +196,19 @@ func newBackupMetaCommand() *cobra.Command {
newTable := new(model.TableInfo)
tableID, _ := tableIDAllocator.Alloc()
newTable.ID = int64(tableID)
newTable.Name = table.Schema.Name
newTable.Indices = make([]*model.IndexInfo, len(table.Schema.Indices))
for i, indexInfo := range table.Schema.Indices {
newTable.Name = table.Info.Name
newTable.Indices = make([]*model.IndexInfo, len(table.Info.Indices))
for i, indexInfo := range table.Info.Indices {
indexID, _ := indexIDAllocator.Alloc()
newTable.Indices[i] = &model.IndexInfo{
ID: int64(indexID),
Name: indexInfo.Name,
}
}
rules := restore.GetRewriteRules(newTable, table.Schema, 0)
rules := restore.GetRewriteRules(newTable, table.Info, 0)
rewriteRules.Table = append(rewriteRules.Table, rules.Table...)
rewriteRules.Data = append(rewriteRules.Data, rules.Data...)
tableIDMap[table.Schema.ID] = int64(tableID)
tableIDMap[table.Info.ID] = int64(tableID)
}
// Validate rewrite rules
for _, file := range files {
Expand Down
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ require (
github.com/onsi/gomega v1.7.1 // indirect
github.com/pingcap/check v0.0.0-20191216031241-8a5a85928f12
github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 // indirect
github.com/pingcap/kvproto v0.0.0-20200214082216-7ccc45d0063f
github.com/pingcap/kvproto v0.0.0-20200221031907-7adaad393963
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9
github.com/pingcap/parser v0.0.0-20200213042211-e357ed5f237b
github.com/pingcap/pd v1.1.0-beta.0.20200213133706-fbbe75e180e6
Expand Down
9 changes: 4 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -317,13 +317,12 @@ github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011 h1:58naV4XMEqm0h
github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d h1:F8vp38kTAckN+v8Jlc98uMBvKIzr1a+UhnLyVYn8Q5Q=
github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d/go.mod h1:DNS3Qg7bEDhU6EXNHF+XSv/PGznQaMJ5FWvctpm6pQI=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20200213074014-83e827908584 h1:DhQfXNn9m36b2/4zUfPHDDR6CwS2VONbfPC4s+LMVj0=
github.com/pingcap/kvproto v0.0.0-20200213074014-83e827908584/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20200214082216-7ccc45d0063f h1:wi4TNMBfsgiMsOlTSHBq4JKFViabIA1W0d+owiLtp70=
github.com/pingcap/kvproto v0.0.0-20200214082216-7ccc45d0063f/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20200221031907-7adaad393963 h1:QqIzAqo6C3rrekgIjIduCWUw7lEDnzHD/JKdMffwaR0=
github.com/pingcap/kvproto v0.0.0-20200221031907-7adaad393963/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/parser v0.0.0-20200213042211-e357ed5f237b h1:oKql7mOA71N7NxMn3MHtYcxntXrFxNPDMDalF/dW3iM=
Expand Down Expand Up @@ -585,6 +584,7 @@ golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.0.0-20191107010934-f79515f33823/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191115202509-3a792d9c32b2 h1:EtTFh6h4SAKemS+CURDMTDIANuduG5zKEXShyy18bGA=
golang.org/x/tools v0.0.0-20191115202509-3a792d9c32b2/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f h1:kDxGY2VmgABOe55qheT/TFqUMtcTHnomIPS1iv3G4Ms=
golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200103221440-774c71fcf114/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20200107184032-11e9d9cc0042 h1:BKiPVwWbEdmAh+5CBwk13CYeVJQRDJpDnKgDyMOGz9M=
Expand Down Expand Up @@ -620,7 +620,6 @@ google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiq
google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.23.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.24.0 h1:vb/1TCsVn3DcJlQ0Gs1yB1pKI6Do2/QNwxdKqmc/b0s=
google.golang.org/grpc v1.24.0/go.mod h1:XDChyiUovWa60DnaeDeZmSW86xtLtjtZbwvSiRnRtcA=
google.golang.org/grpc v1.25.1 h1:wdKvqQk7IttEw92GoRyKG2IDrUIpgpj6H6m81yfeMW0=
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
Expand Down
55 changes: 53 additions & 2 deletions pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/oracle"
Expand Down Expand Up @@ -119,15 +120,20 @@ func (bc *Client) SetStorage(ctx context.Context, backend *backup.StorageBackend
}

// SaveBackupMeta saves the current backup meta at the given path.
func (bc *Client) SaveBackupMeta(ctx context.Context) error {
func (bc *Client) SaveBackupMeta(ctx context.Context, ddlJobs []*model.Job) error {
ddlJobsData, err := json.Marshal(ddlJobs)
if err != nil {
return errors.Trace(err)
}
bc.backupMeta.Ddls = ddlJobsData
backupMetaData, err := proto.Marshal(&bc.backupMeta)
if err != nil {
return errors.Trace(err)
}
log.Debug("backup meta",
zap.Reflect("meta", bc.backupMeta))
backendURL := storage.FormatBackendURL(bc.backend)
log.Info("save backup meta", zap.Stringer("path", &backendURL))
log.Info("save backup meta", zap.Stringer("path", &backendURL), zap.Int("jobs", len(ddlJobs)))
return bc.storage.Write(ctx, utils.MetaFile, backupMetaData)
}

Expand Down Expand Up @@ -241,6 +247,51 @@ func BuildBackupRangeAndSchema(
return ranges, backupSchemas, nil
}

// GetBackupDDLJobs returns the ddl jobs are done in (lastBackupTS, backupTS]
func GetBackupDDLJobs(dom *domain.Domain, lastBackupTS, backupTS uint64) ([]*model.Job, error) {
snapMeta, err := dom.GetSnapshotMeta(backupTS)
if err != nil {
return nil, errors.Trace(err)
}
lastSnapMeta, err := dom.GetSnapshotMeta(lastBackupTS)
if err != nil {
return nil, errors.Trace(err)
}
lastSchemaVersion, err := lastSnapMeta.GetSchemaVersion()
if err != nil {
return nil, errors.Trace(err)
}
allJobs := make([]*model.Job, 0)
defaultJobs, err := snapMeta.GetAllDDLJobsInQueue(meta.DefaultJobListKey)
if err != nil {
return nil, errors.Trace(err)
}
log.Debug("get default jobs", zap.Int("jobs", len(defaultJobs)))
allJobs = append(allJobs, defaultJobs...)
addIndexJobs, err := snapMeta.GetAllDDLJobsInQueue(meta.AddIndexJobListKey)
if err != nil {
return nil, errors.Trace(err)
}
log.Debug("get add index jobs", zap.Int("jobs", len(addIndexJobs)))
allJobs = append(allJobs, addIndexJobs...)
historyJobs, err := snapMeta.GetAllHistoryDDLJobs()
if err != nil {
return nil, errors.Trace(err)
}
log.Debug("get history jobs", zap.Int("jobs", len(historyJobs)))
allJobs = append(allJobs, historyJobs...)

completedJobs := make([]*model.Job, 0)
for _, job := range allJobs {
if (job.State == model.JobStateDone || job.State == model.JobStateSynced) &&
(job.BinlogInfo != nil && job.BinlogInfo.SchemaVersion > lastSchemaVersion) {
completedJobs = append(completedJobs, job)
}
}
log.Debug("get completed jobs", zap.Int("jobs", len(completedJobs)))
return completedJobs, nil
}

// BackupRanges make a backup of the given key ranges.
func (bc *Client) BackupRanges(
ctx context.Context,
Expand Down
8 changes: 4 additions & 4 deletions pkg/checksum/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func buildChecksumRequest(
reqs := make([]*kv.Request, 0, (len(newTable.Indices)+1)*(len(partDefs)+1))
var oldTableID int64
if oldTable != nil {
oldTableID = oldTable.Schema.ID
oldTableID = oldTable.Info.ID
}
rs, err := buildRequest(newTable, newTable.ID, oldTable, oldTableID, startTS)
if err != nil {
Expand All @@ -72,7 +72,7 @@ func buildChecksumRequest(
for _, partDef := range partDefs {
var oldPartID int64
if oldTable != nil {
for _, oldPartDef := range oldTable.Schema.Partition.Definitions {
for _, oldPartDef := range oldTable.Info.Partition.Definitions {
if oldPartDef.Name == partDef.Name {
oldPartID = oldPartDef.ID
}
Expand Down Expand Up @@ -108,7 +108,7 @@ func buildRequest(
}
var oldIndexInfo *model.IndexInfo
if oldTable != nil {
for _, oldIndex := range oldTable.Schema.Indices {
for _, oldIndex := range oldTable.Info.Indices {
if oldIndex.Name == indexInfo.Name {
oldIndexInfo = oldIndex
break
Expand All @@ -117,7 +117,7 @@ func buildRequest(
if oldIndexInfo == nil {
log.Panic("index not found",
zap.Reflect("table", tableInfo),
zap.Reflect("oldTable", oldTable.Schema),
zap.Reflect("oldTable", oldTable.Info),
zap.Stringer("index", indexInfo.Name))
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/checksum/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (s *testChecksumSuite) TestChecksum(c *C) {
// Test rewrite rules
tk.MustExec("alter table t1 add index i2(a);")
tableInfo1 = s.getTableInfo(c, "test", "t1")
oldTable := utils.Table{Schema: tableInfo1}
oldTable := utils.Table{Info: tableInfo1}
exe2, err = NewExecutorBuilder(tableInfo2, math.MaxUint64).
SetOldTable(&oldTable).Build()
c.Assert(err, IsNil)
Expand Down
41 changes: 38 additions & 3 deletions pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package restore

import (
"context"
"encoding/json"
"math"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -40,6 +42,7 @@ type Client struct {
tableWorkerPool *utils.WorkerPool

databases map[string]*utils.Database
ddlJobs []*model.Job
backupMeta *backup.BackupMeta
db *DB
rateLimit uint64
Expand Down Expand Up @@ -97,8 +100,15 @@ func (rc *Client) InitBackupMeta(backupMeta *backup.BackupMeta, backend *backup.
if err != nil {
return errors.Trace(err)
}
var ddlJobs []*model.Job
err = json.Unmarshal(backupMeta.GetDdls(), &ddlJobs)
if err != nil {
return errors.Trace(err)
}
rc.databases = databases
rc.ddlJobs = ddlJobs
rc.backupMeta = backupMeta
log.Info("load backupmeta", zap.Int("databases", len(rc.databases)), zap.Int("jobs", len(rc.ddlJobs)))

metaClient := NewSplitClient(rc.pdClient)
importClient := NewImportClient(metaClient)
Expand Down Expand Up @@ -151,6 +161,11 @@ func (rc *Client) GetDatabase(name string) *utils.Database {
return rc.databases[name]
}

// GetDDLJobs returns ddl jobs
func (rc *Client) GetDDLJobs() []*model.Job {
return rc.ddlJobs
}

// GetTableSchema returns the schema of a table from TiDB.
func (rc *Client) GetTableSchema(
dom *domain.Domain,
Expand Down Expand Up @@ -189,18 +204,38 @@ func (rc *Client) CreateTables(
if err != nil {
return nil, nil, err
}
newTableInfo, err := rc.GetTableSchema(dom, table.Db.Name, table.Schema.Name)
newTableInfo, err := rc.GetTableSchema(dom, table.Db.Name, table.Info.Name)
if err != nil {
return nil, nil, err
}
rules := GetRewriteRules(newTableInfo, table.Schema, newTS)
rules := GetRewriteRules(newTableInfo, table.Info, newTS)
rewriteRules.Table = append(rewriteRules.Table, rules.Table...)
rewriteRules.Data = append(rewriteRules.Data, rules.Data...)
newTables = append(newTables, newTableInfo)
}
return rewriteRules, newTables, nil
}

// ExecDDLs executes the queries of the ddl jobs.
func (rc *Client) ExecDDLs(ddlJobs []*model.Job) error {
// Sort the ddl jobs by schema version in ascending order.
sort.Slice(ddlJobs, func(i, j int) bool {
return ddlJobs[i].BinlogInfo.SchemaVersion < ddlJobs[j].BinlogInfo.SchemaVersion
})

for _, job := range ddlJobs {
err := rc.db.ExecDDL(rc.ctx, job)
if err != nil {
return errors.Trace(err)
}
log.Info("execute ddl query",
zap.String("db", job.BinlogInfo.DBInfo.Name.String()),
zap.String("query", job.Query),
zap.Int64("historySchemaVersion", job.BinlogInfo.SchemaVersion))
}
return nil
}

func (rc *Client) setSpeedLimit() error {
if !rc.hasSpeedLimited && rc.rateLimit != 0 {
stores, err := rc.pdClient.GetAllStores(rc.ctx, pd.WithExcludeTombstone())
Expand Down Expand Up @@ -386,7 +421,7 @@ func (rc *Client) ValidateChecksum(
checksumResp.TotalBytes != table.TotalBytes {
log.Error("failed in validate checksum",
zap.String("database", table.Db.Name.L),
zap.String("table", table.Schema.Name.L),
zap.String("table", table.Info.Name.L),
zap.Uint64("origin tidb crc64", table.Crc64Xor),
zap.Uint64("calculated crc64", checksumResp.Checksum),
zap.Uint64("origin tidb total kvs", table.TotalKvs),
Expand Down
2 changes: 1 addition & 1 deletion pkg/restore/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (s *testRestoreClientSuite) TestCreateTables(c *C) {
for i := len(tables) - 1; i >= 0; i-- {
tables[i] = &utils.Table{
Db: dbSchema,
Schema: &model.TableInfo{
Info: &model.TableInfo{
ID: int64(i),
Name: model.NewCIStr("test" + strconv.Itoa(i)),
Columns: []*model.ColumnInfo{{
Expand Down
Loading

0 comments on commit 45aa9c8

Please sign in to comment.