Skip to content

Commit

Permalink
br: adaption for keyspace feature (pingcap#40532)
Browse files Browse the repository at this point in the history
  • Loading branch information
iosmanthus authored and ghazalfamilyusa committed Feb 6, 2023
1 parent 92b5af6 commit 97ef332
Show file tree
Hide file tree
Showing 18 changed files with 242 additions and 46 deletions.
3 changes: 0 additions & 3 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -4831,7 +4831,6 @@ def go_deps():
sum = "h1:CYjC+xzdPvbV65gi6Dr4YowKcmLo045pm18L0DhdELM=",
version = "v0.2.0",
)

go_repository(
name = "com_google_cloud_go_gsuiteaddons",
build_file_proto_mode = "disable",
Expand Down Expand Up @@ -5044,7 +5043,6 @@ def go_deps():
sum = "h1:u6EznTGzIdsyOsvm+Xkw0aSuKFXQlyjGE9a4exk6iNQ=",
version = "v1.3.1",
)

go_repository(
name = "com_google_cloud_go_recaptchaenterprise_v2",
build_file_proto_mode = "disable",
Expand Down Expand Up @@ -5249,7 +5247,6 @@ def go_deps():
sum = "h1:/CsSTkbmO9HC8iQpxbK8ATms3OQaX3YQUeTMGCxlaK4=",
version = "v1.2.0",
)

go_repository(
name = "com_google_cloud_go_vision_v2",
build_file_proto_mode = "disable",
Expand Down
46 changes: 37 additions & 9 deletions br/pkg/checksum/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ type ExecutorBuilder struct {
oldTable *metautil.Table

concurrency uint

oldKeyspace []byte
newKeyspace []byte
}

// NewExecutorBuilder returns a new executor builder.
Expand All @@ -51,9 +54,26 @@ func (builder *ExecutorBuilder) SetConcurrency(conc uint) *ExecutorBuilder {
return builder
}

func (builder *ExecutorBuilder) SetOldKeyspace(keyspace []byte) *ExecutorBuilder {
builder.oldKeyspace = keyspace
return builder
}

func (builder *ExecutorBuilder) SetNewKeyspace(keyspace []byte) *ExecutorBuilder {
builder.newKeyspace = keyspace
return builder
}

// Build builds a checksum executor.
func (builder *ExecutorBuilder) Build() (*Executor, error) {
reqs, err := buildChecksumRequest(builder.table, builder.oldTable, builder.ts, builder.concurrency)
reqs, err := buildChecksumRequest(
builder.table,
builder.oldTable,
builder.ts,
builder.concurrency,
builder.oldKeyspace,
builder.newKeyspace,
)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -65,6 +85,8 @@ func buildChecksumRequest(
oldTable *metautil.Table,
startTS uint64,
concurrency uint,
oldKeyspace []byte,
newKeyspace []byte,
) ([]*kv.Request, error) {
var partDefs []model.PartitionDefinition
if part := newTable.Partition; part != nil {
Expand All @@ -76,7 +98,7 @@ func buildChecksumRequest(
if oldTable != nil {
oldTableID = oldTable.Info.ID
}
rs, err := buildRequest(newTable, newTable.ID, oldTable, oldTableID, startTS, concurrency)
rs, err := buildRequest(newTable, newTable.ID, oldTable, oldTableID, startTS, concurrency, oldKeyspace, newKeyspace)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -91,7 +113,7 @@ func buildChecksumRequest(
}
}
}
rs, err := buildRequest(newTable, partDef.ID, oldTable, oldPartID, startTS, concurrency)
rs, err := buildRequest(newTable, partDef.ID, oldTable, oldPartID, startTS, concurrency, oldKeyspace, newKeyspace)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -108,9 +130,11 @@ func buildRequest(
oldTableID int64,
startTS uint64,
concurrency uint,
oldKeyspace []byte,
newKeyspace []byte,
) ([]*kv.Request, error) {
reqs := make([]*kv.Request, 0)
req, err := buildTableRequest(tableInfo, tableID, oldTable, oldTableID, startTS, concurrency)
req, err := buildTableRequest(tableInfo, tableID, oldTable, oldTableID, startTS, concurrency, oldKeyspace, newKeyspace)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -139,7 +163,7 @@ func buildRequest(
}
}
req, err = buildIndexRequest(
tableID, indexInfo, oldTableID, oldIndexInfo, startTS, concurrency)
tableID, indexInfo, oldTableID, oldIndexInfo, startTS, concurrency, oldKeyspace, newKeyspace)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -156,12 +180,14 @@ func buildTableRequest(
oldTableID int64,
startTS uint64,
concurrency uint,
oldKeyspace []byte,
newKeyspace []byte,
) (*kv.Request, error) {
var rule *tipb.ChecksumRewriteRule
if oldTable != nil {
rule = &tipb.ChecksumRewriteRule{
OldPrefix: tablecodec.GenTableRecordPrefix(oldTableID),
NewPrefix: tablecodec.GenTableRecordPrefix(tableID),
OldPrefix: append(append([]byte{}, oldKeyspace...), tablecodec.GenTableRecordPrefix(oldTableID)...),
NewPrefix: append(append([]byte{}, newKeyspace...), tablecodec.GenTableRecordPrefix(tableID)...),
}
}

Expand Down Expand Up @@ -195,12 +221,14 @@ func buildIndexRequest(
oldIndexInfo *model.IndexInfo,
startTS uint64,
concurrency uint,
oldKeyspace []byte,
newKeyspace []byte,
) (*kv.Request, error) {
var rule *tipb.ChecksumRewriteRule
if oldIndexInfo != nil {
rule = &tipb.ChecksumRewriteRule{
OldPrefix: tablecodec.EncodeTableIndexPrefix(oldTableID, oldIndexInfo.ID),
NewPrefix: tablecodec.EncodeTableIndexPrefix(tableID, indexInfo.ID),
OldPrefix: append(append([]byte{}, oldKeyspace...), tablecodec.EncodeTableIndexPrefix(oldTableID, oldIndexInfo.ID)...),
NewPrefix: append(append([]byte{}, newKeyspace...), tablecodec.EncodeTableIndexPrefix(tableID, indexInfo.ID)...),
}
}
checksum := &tipb.ChecksumRequest{
Expand Down
1 change: 1 addition & 0 deletions br/pkg/conn/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
"//br/pkg/pdutil",
"//br/pkg/utils",
"//br/pkg/version",
"//config",
"//domain",
"//kv",
"@com_github_docker_go_units//:go-units",
Expand Down
4 changes: 3 additions & 1 deletion br/pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/version"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/tikv/client-go/v2/oracle"
Expand Down Expand Up @@ -174,7 +175,8 @@ func NewMgr(
}

// Disable GC because TiDB enables GC already.
storage, err := g.Open(fmt.Sprintf("tikv://%s?disableGC=true", pdAddrs), securityOption)
path := fmt.Sprintf("tikv://%s?disableGC=true&keyspaceName=%s", pdAddrs, config.GetGlobalKeyspaceName())
storage, err := g.Open(path, securityOption)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/metautil/metafile.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,12 @@ type MetaReader struct {

// NewMetaReader creates MetaReader.
func NewMetaReader(
backpMeta *backuppb.BackupMeta,
backupMeta *backuppb.BackupMeta,
storage storage.ExternalStorage,
cipher *backuppb.CipherInfo) *MetaReader {
return &MetaReader{
storage: storage,
backupMeta: backpMeta,
backupMeta: backupMeta,
cipher: cipher,
}
}
Expand Down
25 changes: 21 additions & 4 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ type Client struct {

// the successfully preallocated table IDs.
preallocedTableIDs *tidalloc.PreallocIDs

// the rewrite mode of the downloaded SST files in TiKV.
rewriteMode RewriteMode
}

// NewRestoreClient returns a new RestoreClient.
Expand Down Expand Up @@ -317,6 +320,14 @@ func (rc *Client) GetBatchDdlSize() uint {
return rc.batchDdlSize
}

func (rc *Client) SetRewriteMode(mode RewriteMode) {
rc.rewriteMode = mode
}

func (rc *Client) GetRewriteMode() RewriteMode {
return rc.rewriteMode
}

// Close a client.
func (rc *Client) Close() {
// rc.db can be nil in raw kv mode.
Expand Down Expand Up @@ -346,7 +357,7 @@ func (rc *Client) SetStorage(ctx context.Context, backend *backuppb.StorageBacke
func (rc *Client) InitClients(backend *backuppb.StorageBackend, isRawKvMode bool) {
metaClient := split.NewSplitClient(rc.pdClient, rc.tlsConf, isRawKvMode)
importCli := NewImportClient(metaClient, rc.tlsConf, rc.keepaliveConf)
rc.fileImporter = NewFileImporter(metaClient, importCli, backend, isRawKvMode)
rc.fileImporter = NewFileImporter(metaClient, importCli, backend, isRawKvMode, rc.rewriteMode)
}

func (rc *Client) SetRawKVClient(c *RawKVBatchClient) {
Expand Down Expand Up @@ -870,7 +881,7 @@ func (rc *Client) createTablesInWorkerPool(ctx context.Context, dom *domain.Doma
}
})
if err != nil {
log.Error("create tables fail")
log.Error("create tables fail", zap.Error(err))
return err
}
for _, ct := range cts {
Expand Down Expand Up @@ -1042,7 +1053,7 @@ func MockCallSetSpeedLimit(ctx context.Context, fakeImportClient ImporterClient,
rc.SetRateLimit(42)
rc.SetConcurrency(concurrency)
rc.hasSpeedLimited = false
rc.fileImporter = NewFileImporter(nil, fakeImportClient, nil, false)
rc.fileImporter = NewFileImporter(nil, fakeImportClient, nil, false, RewriteModeLegacy)
return rc.setSpeedLimit(ctx, rc.rateLimit)
}

Expand Down Expand Up @@ -1182,7 +1193,7 @@ func (rc *Client) RestoreSSTFiles(
zap.Duration("take", time.Since(fileStart)))
updateCh.Inc()
}()
return rc.fileImporter.ImportSSTFiles(ectx, filesReplica, rewriteRules, rc.cipher, rc.backupMeta.ApiVersion)
return rc.fileImporter.ImportSSTFiles(ectx, filesReplica, rewriteRules, rc.cipher, rc.dom.Store().GetCodec().GetAPIVersion())
})
}

Expand Down Expand Up @@ -1434,6 +1445,8 @@ func (rc *Client) execChecksum(
exe, err := checksum.NewExecutorBuilder(tbl.Table, startTS).
SetOldTable(tbl.OldTable).
SetConcurrency(concurrency).
SetOldKeyspace(tbl.RewriteRule.OldKeyspace).
SetNewKeyspace(tbl.RewriteRule.NewKeyspace).
Build()
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -2762,6 +2775,10 @@ func TidyOldSchemas(sr *stream.SchemasReplace) *backup.Schemas {
return schemas
}

func CheckKeyspaceBREnable(ctx context.Context, pdClient pd.Client) error {
return version.CheckClusterVersion(ctx, pdClient, version.CheckVersionForKeyspaceBR)
}

func CheckNewCollationEnable(
backupNewCollationEnable string,
g glue.Glue,
Expand Down
Loading

0 comments on commit 97ef332

Please sign in to comment.