Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Incremental BR: support DDL #155

Merged
merged 14 commits into from
Feb 20, 2020
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions cmd/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,19 +196,19 @@ func newBackupMetaCommand() *cobra.Command {
newTable := new(model.TableInfo)
tableID, _ := tableIDAllocator.Alloc()
newTable.ID = int64(tableID)
newTable.Name = table.Schema.Name
newTable.Indices = make([]*model.IndexInfo, len(table.Schema.Indices))
for i, indexInfo := range table.Schema.Indices {
newTable.Name = table.Info.Name
newTable.Indices = make([]*model.IndexInfo, len(table.Info.Indices))
for i, indexInfo := range table.Info.Indices {
indexID, _ := indexIDAllocator.Alloc()
newTable.Indices[i] = &model.IndexInfo{
ID: int64(indexID),
Name: indexInfo.Name,
}
}
rules := restore.GetRewriteRules(newTable, table.Schema, 0)
rules := restore.GetRewriteRules(newTable, table.Info, 0)
rewriteRules.Table = append(rewriteRules.Table, rules.Table...)
rewriteRules.Data = append(rewriteRules.Data, rules.Data...)
tableIDMap[table.Schema.ID] = int64(tableID)
tableIDMap[table.Info.ID] = int64(tableID)
}
// Validate rewrite rules
for _, file := range files {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/onsi/gomega v1.7.1 // indirect
github.com/pingcap/check v0.0.0-20191216031241-8a5a85928f12
github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011
github.com/pingcap/kvproto v0.0.0-20200108025604-a4dc183d2af5
github.com/pingcap/kvproto v0.0.0-20200210234432-a965739f8162
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9
github.com/pingcap/parser v0.0.0-20200109073933-a9496438d77d
github.com/pingcap/pd v1.1.0-beta.0.20191219054547-4d65bbefbc6d
Expand Down
11 changes: 11 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ github.com/VividCortex/ewma v1.1.1 h1:MnEK4VOv6n0RSY4vtRe3h11qjxL3+t0B8yOL8iMXdc
github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/antihax/optional v0.0.0-20180407024304-ca021399b1a6/go.mod h1:V8iCPQYkqmusNa815XgQio277wI47sdRh1dUOLdyC6Q=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/aws/aws-sdk-go v1.26.1 h1:JGQggXhOiNJIqsmbYUl3cYtJZUffeOWlHtxfzGK7WPI=
github.com/aws/aws-sdk-go v1.26.1/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
Expand Down Expand Up @@ -182,6 +183,8 @@ github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgf
github.com/grpc-ecosystem/grpc-gateway v1.4.1/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw=
github.com/grpc-ecosystem/grpc-gateway v1.9.5 h1:UImYN5qQ8tuGpGE16ZmjvcTtTw24zw1QAp/SlnNrZhI=
github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/grpc-ecosystem/grpc-gateway v1.12.1 h1:zCy2xE9ablevUOrUZc3Dl72Dt+ya2FNAvC2yLYMHzi4=
github.com/grpc-ecosystem/grpc-gateway v1.12.1/go.mod h1:8XEsbTttt/W+VvjtQhLACqCisSPWTxCZ7sBRjU6iH9c=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
Expand Down Expand Up @@ -300,6 +303,8 @@ github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO
github.com/pingcap/kvproto v0.0.0-20191213111810-93cb7c623c8b/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20200108025604-a4dc183d2af5 h1:RUxQExD5yubAjWGnw8kcxfO9abbiVHIE1rbuCyQCWDE=
github.com/pingcap/kvproto v0.0.0-20200108025604-a4dc183d2af5/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20200210234432-a965739f8162 h1:lsoIoCoXMpcHvW6jHcqP/prA4I6duAp1DVyG2ULz4bM=
github.com/pingcap/kvproto v0.0.0-20200210234432-a965739f8162/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/parser v0.0.0-20200109073933-a9496438d77d h1:4QwSJRxmBjTB9ssJNWg2f2bDm5rqnHCUUjMh4N1QOOY=
Expand Down Expand Up @@ -342,6 +347,7 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT
github.com/remyoudompheng/bigfft v0.0.0-20190728182440-6a916e37a237 h1:HQagqIiBmr8YXawX/le3+O26N+vPPC1PtjaF3mwnook=
github.com/remyoudompheng/bigfft v0.0.0-20190728182440-6a916e37a237/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
Expand Down Expand Up @@ -496,6 +502,7 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190909003024-a7b16738d86b h1:XfVGCX+0T4WOStkaOsJRllbsiImhB2jgVBGc9L0lPGc=
golang.org/x/net v0.0.0-20190909003024-a7b16738d86b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191002035440-2ec189313ef0/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191011234655-491137f69257 h1:ry8e2D+cwaV6hk7lb3aRTjjZo24shrbK0e11QEOkTIg=
golang.org/x/net v0.0.0-20191011234655-491137f69257/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
Expand Down Expand Up @@ -563,6 +570,7 @@ golang.org/x/tools v0.0.0-20191107010934-f79515f33823 h1:akkRBeitX2EZP59KdtKw310
golang.org/x/tools v0.0.0-20191107010934-f79515f33823/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191115202509-3a792d9c32b2 h1:EtTFh6h4SAKemS+CURDMTDIANuduG5zKEXShyy18bGA=
golang.org/x/tools v0.0.0-20191115202509-3a792d9c32b2/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f h1:kDxGY2VmgABOe55qheT/TFqUMtcTHnomIPS1iv3G4Ms=
golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200107184032-11e9d9cc0042 h1:BKiPVwWbEdmAh+5CBwk13CYeVJQRDJpDnKgDyMOGz9M=
golang.org/x/tools v0.0.0-20200107184032-11e9d9cc0042/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
Expand Down Expand Up @@ -596,6 +604,7 @@ google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98
google.golang.org/genproto v0.0.0-20190905072037-92dd089d5514 h1:oFSK4421fpCKRrpzIpybyBVWyht05NegY9+L/3TLAZs=
google.golang.org/genproto v0.0.0-20190905072037-92dd089d5514/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20190911173649-1774047e7e51/go.mod h1:IbNlFCBrqXvoKpeg0TB2l7cyZUmoaFKYIwrEpbDKLA8=
google.golang.org/genproto v0.0.0-20190927181202-20e1ac93f88c/go.mod h1:IbNlFCBrqXvoKpeg0TB2l7cyZUmoaFKYIwrEpbDKLA8=
google.golang.org/genproto v0.0.0-20191115194625-c23dd37a84c9 h1:6XzpBoANz1NqMNfDXzc2QmHmbb1vyMsvRfoP5rM+K1I=
google.golang.org/genproto v0.0.0-20191115194625-c23dd37a84c9/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc=
google.golang.org/grpc v0.0.0-20180607172857-7a6a684ca69e/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
Expand All @@ -605,6 +614,7 @@ google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiq
google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.23.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.24.0/go.mod h1:XDChyiUovWa60DnaeDeZmSW86xtLtjtZbwvSiRnRtcA=
google.golang.org/grpc v1.25.1 h1:wdKvqQk7IttEw92GoRyKG2IDrUIpgpj6H6m81yfeMW0=
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U=
Expand All @@ -629,6 +639,7 @@ gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bl
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
Expand Down
55 changes: 53 additions & 2 deletions pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/oracle"
Expand Down Expand Up @@ -119,15 +120,20 @@ func (bc *Client) SetStorage(ctx context.Context, backend *backup.StorageBackend
}

// SaveBackupMeta saves the current backup meta at the given path.
func (bc *Client) SaveBackupMeta(ctx context.Context) error {
func (bc *Client) SaveBackupMeta(ctx context.Context, ddlJobs []*model.Job) error {
ddlJobsData, err := json.Marshal(ddlJobs)
if err != nil {
return errors.Trace(err)
}
bc.backupMeta.Ddls = ddlJobsData
backupMetaData, err := proto.Marshal(&bc.backupMeta)
if err != nil {
return errors.Trace(err)
}
log.Debug("backup meta",
zap.Reflect("meta", bc.backupMeta))
backendURL := storage.FormatBackendURL(bc.backend)
log.Info("save backup meta", zap.Stringer("path", &backendURL))
log.Info("save backup meta", zap.Stringer("path", &backendURL), zap.Int("jobs", len(ddlJobs)))
return bc.storage.Write(ctx, utils.MetaFile, backupMetaData)
}

Expand Down Expand Up @@ -241,6 +247,51 @@ func BuildBackupRangeAndSchema(
return ranges, backupSchemas, nil
}

// GetBackupDDLJobs returns the ddl jobs are done in (lastBackupTS, backupTS]
func GetBackupDDLJobs(dom *domain.Domain, lastBackupTS, backupTS uint64) ([]*model.Job, error) {
snapMeta, err := dom.GetSnapshotMeta(backupTS)
if err != nil {
return nil, errors.Trace(err)
}
lastSnapMeta, err := dom.GetSnapshotMeta(lastBackupTS)
if err != nil {
return nil, errors.Trace(err)
}
lastSchemaVersion, err := lastSnapMeta.GetSchemaVersion()
if err != nil {
return nil, errors.Trace(err)
}
allJobs := make([]*model.Job, 0)
defaultJobs, err := snapMeta.GetAllDDLJobsInQueue(meta.DefaultJobListKey)
if err != nil {
return nil, errors.Trace(err)
}
log.Debug("get default jobs", zap.Int("jobs", len(defaultJobs)))
allJobs = append(allJobs, defaultJobs...)
addIndexJobs, err := snapMeta.GetAllDDLJobsInQueue(meta.AddIndexJobListKey)
if err != nil {
return nil, errors.Trace(err)
}
log.Debug("get add index jobs", zap.Int("jobs", len(addIndexJobs)))
allJobs = append(allJobs, addIndexJobs...)
historyJobs, err := snapMeta.GetAllHistoryDDLJobs()
if err != nil {
return nil, errors.Trace(err)
}
log.Debug("get history jobs", zap.Int("jobs", len(historyJobs)))
allJobs = append(allJobs, historyJobs...)

completedJobs := make([]*model.Job, 0)
for _, job := range allJobs {
if (job.State == model.JobStateDone || job.State == model.JobStateSynced) &&
(job.BinlogInfo != nil && job.BinlogInfo.SchemaVersion > lastSchemaVersion) {
completedJobs = append(completedJobs, job)
}
}
log.Debug("get completed jobs", zap.Int("jobs", len(completedJobs)))
return completedJobs, nil
}

// BackupRanges make a backup of the given key ranges.
func (bc *Client) BackupRanges(
ctx context.Context,
Expand Down
8 changes: 4 additions & 4 deletions pkg/checksum/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func buildChecksumRequest(
reqs := make([]*kv.Request, 0, (len(newTable.Indices)+1)*(len(partDefs)+1))
var oldTableID int64
if oldTable != nil {
oldTableID = oldTable.Schema.ID
oldTableID = oldTable.Info.ID
}
rs, err := buildRequest(newTable, newTable.ID, oldTable, oldTableID, startTS)
if err != nil {
Expand All @@ -72,7 +72,7 @@ func buildChecksumRequest(
for _, partDef := range partDefs {
var oldPartID int64
if oldTable != nil {
for _, oldPartDef := range oldTable.Schema.Partition.Definitions {
for _, oldPartDef := range oldTable.Info.Partition.Definitions {
if oldPartDef.Name == partDef.Name {
oldPartID = oldPartDef.ID
}
Expand Down Expand Up @@ -108,7 +108,7 @@ func buildRequest(
}
var oldIndexInfo *model.IndexInfo
if oldTable != nil {
for _, oldIndex := range oldTable.Schema.Indices {
for _, oldIndex := range oldTable.Info.Indices {
if oldIndex.Name == indexInfo.Name {
oldIndexInfo = oldIndex
break
Expand All @@ -117,7 +117,7 @@ func buildRequest(
if oldIndexInfo == nil {
log.Panic("index not found",
zap.Reflect("table", tableInfo),
zap.Reflect("oldTable", oldTable.Schema),
zap.Reflect("oldTable", oldTable.Info),
zap.Stringer("index", indexInfo.Name))
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/checksum/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (s *testChecksumSuite) TestChecksum(c *C) {
// Test rewrite rules
tk.MustExec("alter table t1 add index i2(a);")
tableInfo1 = s.getTableInfo(c, "test", "t1")
oldTable := utils.Table{Schema: tableInfo1}
oldTable := utils.Table{Info: tableInfo1}
exe2, err = NewExecutorBuilder(tableInfo2, math.MaxUint64).
SetOldTable(&oldTable).Build()
c.Assert(err, IsNil)
Expand Down
47 changes: 44 additions & 3 deletions pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package restore

import (
"context"
"encoding/json"
"math"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -40,6 +42,7 @@ type Client struct {
tableWorkerPool *utils.WorkerPool

databases map[string]*utils.Database
ddlJobs []*model.Job
backupMeta *backup.BackupMeta
db *DB
rateLimit uint64
Expand Down Expand Up @@ -97,8 +100,15 @@ func (rc *Client) InitBackupMeta(backupMeta *backup.BackupMeta, backend *backup.
if err != nil {
return errors.Trace(err)
}
var ddlJobs []*model.Job
err = json.Unmarshal(backupMeta.GetDdls(), &ddlJobs)
if err != nil {
return errors.Trace(err)
}
rc.databases = databases
rc.ddlJobs = ddlJobs
rc.backupMeta = backupMeta
log.Info("load backupmeta", zap.Int("databases", len(rc.databases)), zap.Int("jobs", len(rc.ddlJobs)))

metaClient := NewSplitClient(rc.pdClient)
importClient := NewImportClient(metaClient)
Expand Down Expand Up @@ -151,6 +161,11 @@ func (rc *Client) GetDatabase(name string) *utils.Database {
return rc.databases[name]
}

// GetDDLJobs returns ddl jobs
func (rc *Client) GetDDLJobs() []*model.Job {
return rc.ddlJobs
}

// GetTableSchema returns the schema of a table from TiDB.
func (rc *Client) GetTableSchema(
dom *domain.Domain,
Expand Down Expand Up @@ -189,18 +204,44 @@ func (rc *Client) CreateTables(
if err != nil {
return nil, nil, err
}
newTableInfo, err := rc.GetTableSchema(dom, table.Db.Name, table.Schema.Name)
newTableInfo, err := rc.GetTableSchema(dom, table.Db.Name, table.Info.Name)
if err != nil {
return nil, nil, err
}
rules := GetRewriteRules(newTableInfo, table.Schema, newTS)
rules := GetRewriteRules(newTableInfo, table.Info, newTS)
rewriteRules.Table = append(rewriteRules.Table, rules.Table...)
rewriteRules.Data = append(rewriteRules.Data, rules.Data...)
newTables = append(newTables, newTableInfo)
}
return rewriteRules, newTables, nil
}

// ExecDDLs executes the queries of the ddl jobs.
func (rc *Client) ExecDDLs(ddlJobs []*model.Job) error {
// Sort the ddl jobs by schema version in ascending order.
sort.Slice(ddlJobs, func(i, j int) bool {
return ddlJobs[i].BinlogInfo.SchemaVersion < ddlJobs[j].BinlogInfo.SchemaVersion
})

for _, job := range ddlJobs {
log.Debug("pre-execute ddl jobs",
zap.String("db", job.SchemaName),
zap.String("query", job.Query),
zap.Int64("historySchemaVersion", job.BinlogInfo.SchemaVersion))
}
for _, job := range ddlJobs {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you merge these two loop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log is not necessary, have removed it.

err := rc.db.ExecDDL(rc.ctx, job)
if err != nil {
return errors.Trace(err)
}
log.Info("execute ddl query",
zap.String("db", job.SchemaName),
zap.String("query", job.Query),
zap.Int64("historySchemaVersion", job.BinlogInfo.SchemaVersion))
}
return nil
}

func (rc *Client) setSpeedLimit() error {
if !rc.hasSpeedLimited && rc.rateLimit != 0 {
stores, err := rc.pdClient.GetAllStores(rc.ctx, pd.WithExcludeTombstone())
Expand Down Expand Up @@ -380,7 +421,7 @@ func (rc *Client) ValidateChecksum(
checksumResp.TotalBytes != table.TotalBytes {
log.Error("failed in validate checksum",
zap.String("database", table.Db.Name.L),
zap.String("table", table.Schema.Name.L),
zap.String("table", table.Info.Name.L),
zap.Uint64("origin tidb crc64", table.Crc64Xor),
zap.Uint64("calculated crc64", checksumResp.Checksum),
zap.Uint64("origin tidb total kvs", table.TotalKvs),
Expand Down
2 changes: 1 addition & 1 deletion pkg/restore/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (s *testRestoreClientSuite) TestCreateTables(c *C) {
for i := len(tables) - 1; i >= 0; i-- {
tables[i] = &utils.Table{
Db: dbSchema,
Schema: &model.TableInfo{
Info: &model.TableInfo{
ID: int64(i),
Name: model.NewCIStr("test" + strconv.Itoa(i)),
Columns: []*model.ColumnInfo{{
Expand Down
Loading