Skip to content

Commit

Permalink
br: Compatibility problem between br and TiDB for DDLs about attribut…
Browse files Browse the repository at this point in the history
…e in Incremental Backup (#29360)
  • Loading branch information
joccau authored Nov 4, 2021
1 parent 86caab9 commit d80efa5
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 0 deletions.
22 changes: 22 additions & 0 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,24 @@ func BuildBackupRangeAndSchema(
return ranges, backupSchemas, nil
}

func skipUnsupportedDDLJob(job *model.Job) bool {
switch job.Type {
// TiDB V5.3.0 supports TableAttributes and TablePartitionAttributes.
// Backup guarantees data integrity but region placement, which is out of scope of backup
case model.ActionCreatePlacementPolicy,
model.ActionAlterPlacementPolicy,
model.ActionDropPlacementPolicy,
model.ActionAlterTablePartitionPolicy,
model.ActionModifySchemaDefaultPlacement,
model.ActionAlterTablePlacement,
model.ActionAlterTableAttributes,
model.ActionAlterTablePartitionAttributes:
return true
default:
return false
}
}

// WriteBackupDDLJobs sends the ddl jobs are done in (lastBackupTS, backupTS] to metaWriter.
func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, store kv.Storage, lastBackupTS, backupTS uint64) error {
snapshot := store.GetSnapshot(kv.NewVersion(backupTS))
Expand Down Expand Up @@ -388,6 +406,10 @@ func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, store kv.Storage, lastB

count := 0
for _, job := range allJobs {
if skipUnsupportedDDLJob(job) {
continue
}

if (job.State == model.JobStateDone || job.State == model.JobStateSynced) &&
(job.BinlogInfo != nil && job.BinlogInfo.SchemaVersion > lastSchemaVersion) {
jobBytes, err := json.Marshal(job)
Expand Down
66 changes: 66 additions & 0 deletions br/pkg/backup/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,28 @@ package backup_test

import (
"context"
"encoding/json"
"math"
"testing"
"time"

"github.com/golang/protobuf/proto"
. "github.com/pingcap/check"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/encryptionpb"
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/tidb/br/pkg/backup"
"github.com/pingcap/tidb/br/pkg/conn"
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/mock"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/testkit"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/testutils"
"github.com/tikv/client-go/v2/tikv"
Expand All @@ -33,6 +39,9 @@ type testBackup struct {

mockPDClient pd.Client
backupClient *backup.Client

cluster *mock.Cluster
storage storage.ExternalStorage
}

var _ = Suite(&testBackup{})
Expand All @@ -51,6 +60,14 @@ func (r *testBackup) SetUpSuite(c *C) {
mockMgr.SetHTTP([]string{"test"}, nil)
r.backupClient, err = backup.NewBackupClient(r.ctx, mockMgr)
c.Assert(err, IsNil)

r.cluster, err = mock.NewCluster()
c.Assert(err, IsNil)
base := c.MkDir()
r.storage, err = storage.NewLocalStorage(base)
c.Assert(err, IsNil)
//c.Assert(r.cluster.Start(), IsNil)

}

func (r *testBackup) TestGetTS(c *C) {
Expand Down Expand Up @@ -269,3 +286,52 @@ func (r *testBackup) TestSendCreds(c *C) {
secret_access_key = backend.GetS3().SecretAccessKey
c.Assert(secret_access_key, Equals, "")
}

func (r *testBackup) TestskipUnsupportedDDLJob(c *C) {
tk := testkit.NewTestKit(c, r.cluster.Storage)
tk.MustExec("CREATE DATABASE IF NOT EXISTS test_db;")
tk.MustExec("CREATE TABLE IF NOT EXISTS test_db.test_table (c1 INT);")
lastTS, err := r.cluster.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
c.Assert(err, IsNil, Commentf("Error get last ts: %s", err))
tk.MustExec("RENAME TABLE test_db.test_table to test_db.test_table1;")
tk.MustExec("DROP TABLE test_db.test_table1;")
tk.MustExec("DROP DATABASE test_db;")
tk.MustExec("CREATE DATABASE test_db;")
tk.MustExec("USE test_db;")
tk.MustExec("CREATE TABLE test_table1 (c2 CHAR(255));")
tk.MustExec("RENAME TABLE test_table1 to test_table;")
tk.MustExec("TRUNCATE TABLE test_table;")

tk.MustExec("CREATE TABLE tb(id INT NOT NULL, stu_id INT NOT NULL) " +
"PARTITION BY RANGE (stu_id) (PARTITION p0 VALUES LESS THAN (6),PARTITION p1 VALUES LESS THAN (11))")
tk.MustExec("ALTER TABLE tb attributes \"merge_option=allow\"")
tk.MustExec("ALTER TABLE tb PARTITION p0 attributes \"merge_option=deny\"")

ts, err := r.cluster.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
c.Assert(err, IsNil, Commentf("Error get ts: %s", err))

cipher := backuppb.CipherInfo{CipherType: encryptionpb.EncryptionMethod_PLAINTEXT}
metaWriter := metautil.NewMetaWriter(r.storage, metautil.MetaFileSize, false, &cipher)
ctx := context.Background()
metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDDL)
err = backup.WriteBackupDDLJobs(metaWriter, r.cluster.Storage, lastTS, ts)
c.Assert(err, IsNil, Commentf("Error get ddl jobs: %s", err))
err = metaWriter.FinishWriteMetas(ctx, metautil.AppendDDL)
c.Assert(err, IsNil, Commentf("Flush failed", err))
err = metaWriter.FlushBackupMeta(ctx)
c.Assert(err, IsNil, Commentf("Finially flush backupmeta failed", err))

metaBytes, err := r.storage.ReadFile(ctx, metautil.MetaFile)
c.Assert(err, IsNil)
mockMeta := &backuppb.BackupMeta{}
err = proto.Unmarshal(metaBytes, mockMeta)
c.Assert(err, IsNil)
// check the schema version
metaReader := metautil.NewMetaReader(mockMeta, r.storage, &cipher)
allDDLJobsBytes, err := metaReader.ReadDDLs(ctx)
c.Assert(err, IsNil)
var allDDLJobs []*model.Job
err = json.Unmarshal(allDDLJobsBytes, &allDDLJobs)
c.Assert(err, IsNil)
c.Assert(len(allDDLJobs), Equals, 8)
}

0 comments on commit d80efa5

Please sign in to comment.