Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Incremental BR: support DDL #155

Merged
merged 14 commits into from
Feb 20, 2020
5 changes: 0 additions & 5 deletions pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,11 +283,6 @@ func GetBackupDDLJobs(dom *domain.Domain, lastBackupTS, backupTS uint64) ([]*mod

completedJobs := make([]*model.Job, 0)
for _, job := range allJobs {
log.Debug("get job",
zap.String("query", job.Query),
zap.Int64("schemaVersion", job.BinlogInfo.SchemaVersion),
zap.Int64("lastSchemaVersion", lastSchemaVersion),
zap.Stringer("state", job.State))
if (job.State == model.JobStateDone || job.State == model.JobStateSynced) &&
(job.BinlogInfo != nil && job.BinlogInfo.SchemaVersion > lastSchemaVersion) {
completedJobs = append(completedJobs, job)
Expand Down
31 changes: 26 additions & 5 deletions pkg/restore/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/codec"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -104,10 +105,17 @@ SplitRegions:
}
for regionID, keys := range splitKeyMap {
var newRegions []*RegionInfo
newRegions, err = rs.splitAndScatterRegions(ctx, regionMap[regionID], keys)
region := regionMap[regionID]
newRegions, err = rs.splitAndScatterRegions(ctx, region, keys)
if err != nil {
if strings.Contains(err.Error(), "no valid key") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can it occur? Could you add some comments?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The record backup range has appended a "\000" to the end key, TiKV truncates this suffix and BR does not. Will fixed it in this PR.

continue
for _, key := range keys {
log.Error("no valid key",
zap.Binary("startKey", region.Region.StartKey),
zap.Binary("endKey", region.Region.EndKey),
zap.Binary("key", codec.EncodeBytes([]byte{}, key)))
}
return errors.Trace(err)
}
interval = 2 * interval
if interval > SplitMaxRetryInterval {
Expand All @@ -119,12 +127,13 @@ SplitRegions:
}
continue SplitRegions
}
log.Debug("split regions", zap.Stringer("region", region.Region), zap.ByteStrings("keys", keys))
scatterRegions = append(scatterRegions, newRegions...)
onSplit(keys)
}
break
}
if err != nil && !strings.Contains(err.Error(), "no valid key") {
if err != nil {
return errors.Trace(err)
}
log.Info("splitting regions done, wait for scattering regions",
Expand Down Expand Up @@ -254,7 +263,7 @@ func getSplitKeys(rewriteRules *RewriteRules, ranges []Range, regions []*RegionI
checkKeys = append(checkKeys, rule.GetNewKeyPrefix())
}
for _, rg := range ranges {
checkKeys = append(checkKeys, rg.EndKey)
checkKeys = append(checkKeys, truncateRowKey(rg.EndKey))
}
for _, key := range checkKeys {
if region := needSplit(key, regions); region != nil {
Expand All @@ -263,7 +272,10 @@ func getSplitKeys(rewriteRules *RewriteRules, ranges []Range, regions []*RegionI
splitKeys = make([][]byte, 0, 1)
}
splitKeyMap[region.Region.GetId()] = append(splitKeys, key)
log.Debug("get key for split region", zap.Binary("key", key), zap.Stringer("region", region.Region))
log.Debug("get key for split region",
zap.Binary("key", key),
zap.Binary("startKey", region.Region.StartKey),
zap.Binary("endKey", region.Region.EndKey))
}
}
return splitKeyMap
Expand All @@ -289,6 +301,15 @@ func needSplit(splitKey []byte, regions []*RegionInfo) *RegionInfo {
return nil
}

func truncateRowKey(key []byte) []byte {
if bytes.HasPrefix(key, []byte("t")) &&
len(key) > tablecodec.RecordRowKeyLen &&
bytes.HasPrefix(key[9:], []byte("_r")) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

9 looks like a magic number, could you add some comments to explain it?

return key[:tablecodec.RecordRowKeyLen]
}
return key
}

func beforeEnd(key []byte, end []byte) bool {
return bytes.Compare(key, end) < 0 || len(end) == 0
}
Expand Down
25 changes: 24 additions & 1 deletion pkg/restore/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func validateRegions(regions map[uint64]*RegionInfo) bool {
return false
}
FindRegion:
for i := 1; i < 12; i++ {
for i := 1; i < len(keys); i++ {
for _, region := range regions {
startKey := []byte(keys[i-1])
if len(startKey) != 0 {
Expand All @@ -299,3 +299,26 @@ FindRegion:
}
return true
}

func (s *testRestoreUtilSuite) TestNeedSplit(c *C) {
regions := []*RegionInfo{
{
Region: &metapb.Region{
StartKey: codec.EncodeBytes([]byte{}, []byte("b")),
EndKey: codec.EncodeBytes([]byte{}, []byte("d")),
},
},
}
// Out of region
c.Assert(needSplit([]byte("a"), regions), IsNil)
// Region start key
c.Assert(needSplit([]byte("b"), regions), IsNil)
// In region
region := needSplit([]byte("c"), regions)
c.Assert(region.Region.GetStartKey(), Equals, codec.EncodeBytes([]byte{}, []byte("b")))
c.Assert(region.Region.GetEndKey(), Equals, codec.EncodeBytes([]byte{}, []byte("d")))
// Region end key
c.Assert(needSplit([]byte("d"), regions), IsNil)
// Out of region
c.Assert(needSplit([]byte("e"), regions), IsNil)
}