diff --git a/DEPS.bzl b/DEPS.bzl index 244ae01e489fe..64b86db81c126 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -4831,7 +4831,6 @@ def go_deps(): sum = "h1:CYjC+xzdPvbV65gi6Dr4YowKcmLo045pm18L0DhdELM=", version = "v0.2.0", ) - go_repository( name = "com_google_cloud_go_gsuiteaddons", build_file_proto_mode = "disable", @@ -5044,7 +5043,6 @@ def go_deps(): sum = "h1:u6EznTGzIdsyOsvm+Xkw0aSuKFXQlyjGE9a4exk6iNQ=", version = "v1.3.1", ) - go_repository( name = "com_google_cloud_go_recaptchaenterprise_v2", build_file_proto_mode = "disable", @@ -5249,7 +5247,6 @@ def go_deps(): sum = "h1:/CsSTkbmO9HC8iQpxbK8ATms3OQaX3YQUeTMGCxlaK4=", version = "v1.2.0", ) - go_repository( name = "com_google_cloud_go_vision_v2", build_file_proto_mode = "disable", diff --git a/br/pkg/checksum/executor.go b/br/pkg/checksum/executor.go index c30ae49fccdca..300caf706ef25 100644 --- a/br/pkg/checksum/executor.go +++ b/br/pkg/checksum/executor.go @@ -27,6 +27,9 @@ type ExecutorBuilder struct { oldTable *metautil.Table concurrency uint + + oldKeyspace []byte + newKeyspace []byte } // NewExecutorBuilder returns a new executor builder. @@ -51,9 +54,26 @@ func (builder *ExecutorBuilder) SetConcurrency(conc uint) *ExecutorBuilder { return builder } +func (builder *ExecutorBuilder) SetOldKeyspace(keyspace []byte) *ExecutorBuilder { + builder.oldKeyspace = keyspace + return builder +} + +func (builder *ExecutorBuilder) SetNewKeyspace(keyspace []byte) *ExecutorBuilder { + builder.newKeyspace = keyspace + return builder +} + // Build builds a checksum executor. func (builder *ExecutorBuilder) Build() (*Executor, error) { - reqs, err := buildChecksumRequest(builder.table, builder.oldTable, builder.ts, builder.concurrency) + reqs, err := buildChecksumRequest( + builder.table, + builder.oldTable, + builder.ts, + builder.concurrency, + builder.oldKeyspace, + builder.newKeyspace, + ) if err != nil { return nil, errors.Trace(err) } @@ -65,6 +85,8 @@ func buildChecksumRequest( oldTable *metautil.Table, startTS uint64, concurrency uint, + oldKeyspace []byte, + newKeyspace []byte, ) ([]*kv.Request, error) { var partDefs []model.PartitionDefinition if part := newTable.Partition; part != nil { @@ -76,7 +98,7 @@ func buildChecksumRequest( if oldTable != nil { oldTableID = oldTable.Info.ID } - rs, err := buildRequest(newTable, newTable.ID, oldTable, oldTableID, startTS, concurrency) + rs, err := buildRequest(newTable, newTable.ID, oldTable, oldTableID, startTS, concurrency, oldKeyspace, newKeyspace) if err != nil { return nil, errors.Trace(err) } @@ -91,7 +113,7 @@ func buildChecksumRequest( } } } - rs, err := buildRequest(newTable, partDef.ID, oldTable, oldPartID, startTS, concurrency) + rs, err := buildRequest(newTable, partDef.ID, oldTable, oldPartID, startTS, concurrency, oldKeyspace, newKeyspace) if err != nil { return nil, errors.Trace(err) } @@ -108,9 +130,11 @@ func buildRequest( oldTableID int64, startTS uint64, concurrency uint, + oldKeyspace []byte, + newKeyspace []byte, ) ([]*kv.Request, error) { reqs := make([]*kv.Request, 0) - req, err := buildTableRequest(tableInfo, tableID, oldTable, oldTableID, startTS, concurrency) + req, err := buildTableRequest(tableInfo, tableID, oldTable, oldTableID, startTS, concurrency, oldKeyspace, newKeyspace) if err != nil { return nil, errors.Trace(err) } @@ -139,7 +163,7 @@ func buildRequest( } } req, err = buildIndexRequest( - tableID, indexInfo, oldTableID, oldIndexInfo, startTS, concurrency) + tableID, indexInfo, oldTableID, oldIndexInfo, startTS, concurrency, oldKeyspace, newKeyspace) if err != nil { return nil, errors.Trace(err) } @@ -156,12 +180,14 @@ func buildTableRequest( oldTableID int64, startTS uint64, concurrency uint, + oldKeyspace []byte, + newKeyspace []byte, ) (*kv.Request, error) { var rule *tipb.ChecksumRewriteRule if oldTable != nil { rule = &tipb.ChecksumRewriteRule{ - OldPrefix: tablecodec.GenTableRecordPrefix(oldTableID), - NewPrefix: tablecodec.GenTableRecordPrefix(tableID), + OldPrefix: append(append([]byte{}, oldKeyspace...), tablecodec.GenTableRecordPrefix(oldTableID)...), + NewPrefix: append(append([]byte{}, newKeyspace...), tablecodec.GenTableRecordPrefix(tableID)...), } } @@ -195,12 +221,14 @@ func buildIndexRequest( oldIndexInfo *model.IndexInfo, startTS uint64, concurrency uint, + oldKeyspace []byte, + newKeyspace []byte, ) (*kv.Request, error) { var rule *tipb.ChecksumRewriteRule if oldIndexInfo != nil { rule = &tipb.ChecksumRewriteRule{ - OldPrefix: tablecodec.EncodeTableIndexPrefix(oldTableID, oldIndexInfo.ID), - NewPrefix: tablecodec.EncodeTableIndexPrefix(tableID, indexInfo.ID), + OldPrefix: append(append([]byte{}, oldKeyspace...), tablecodec.EncodeTableIndexPrefix(oldTableID, oldIndexInfo.ID)...), + NewPrefix: append(append([]byte{}, newKeyspace...), tablecodec.EncodeTableIndexPrefix(tableID, indexInfo.ID)...), } } checksum := &tipb.ChecksumRequest{ diff --git a/br/pkg/conn/BUILD.bazel b/br/pkg/conn/BUILD.bazel index fc88f174394f3..da06e516f37ac 100644 --- a/br/pkg/conn/BUILD.bazel +++ b/br/pkg/conn/BUILD.bazel @@ -13,6 +13,7 @@ go_library( "//br/pkg/pdutil", "//br/pkg/utils", "//br/pkg/version", + "//config", "//domain", "//kv", "@com_github_docker_go_units//:go-units", diff --git a/br/pkg/conn/conn.go b/br/pkg/conn/conn.go index 157b9cdf794c9..187bc5ef1db79 100644 --- a/br/pkg/conn/conn.go +++ b/br/pkg/conn/conn.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb/br/pkg/pdutil" "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/br/pkg/version" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" "github.com/tikv/client-go/v2/oracle" @@ -174,7 +175,8 @@ func NewMgr( } // Disable GC because TiDB enables GC already. - storage, err := g.Open(fmt.Sprintf("tikv://%s?disableGC=true", pdAddrs), securityOption) + path := fmt.Sprintf("tikv://%s?disableGC=true&keyspaceName=%s", pdAddrs, config.GetGlobalKeyspaceName()) + storage, err := g.Open(path, securityOption) if err != nil { return nil, errors.Trace(err) } diff --git a/br/pkg/metautil/metafile.go b/br/pkg/metautil/metafile.go index 77b3c4de8b6f6..76ac9a248ccf1 100644 --- a/br/pkg/metautil/metafile.go +++ b/br/pkg/metautil/metafile.go @@ -170,12 +170,12 @@ type MetaReader struct { // NewMetaReader creates MetaReader. func NewMetaReader( - backpMeta *backuppb.BackupMeta, + backupMeta *backuppb.BackupMeta, storage storage.ExternalStorage, cipher *backuppb.CipherInfo) *MetaReader { return &MetaReader{ storage: storage, - backupMeta: backpMeta, + backupMeta: backupMeta, cipher: cipher, } } diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 18a3dc61879e4..2d0ae75408ebb 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -169,6 +169,9 @@ type Client struct { // the successfully preallocated table IDs. preallocedTableIDs *tidalloc.PreallocIDs + + // the rewrite mode of the downloaded SST files in TiKV. + rewriteMode RewriteMode } // NewRestoreClient returns a new RestoreClient. @@ -317,6 +320,14 @@ func (rc *Client) GetBatchDdlSize() uint { return rc.batchDdlSize } +func (rc *Client) SetRewriteMode(mode RewriteMode) { + rc.rewriteMode = mode +} + +func (rc *Client) GetRewriteMode() RewriteMode { + return rc.rewriteMode +} + // Close a client. func (rc *Client) Close() { // rc.db can be nil in raw kv mode. @@ -346,7 +357,7 @@ func (rc *Client) SetStorage(ctx context.Context, backend *backuppb.StorageBacke func (rc *Client) InitClients(backend *backuppb.StorageBackend, isRawKvMode bool) { metaClient := split.NewSplitClient(rc.pdClient, rc.tlsConf, isRawKvMode) importCli := NewImportClient(metaClient, rc.tlsConf, rc.keepaliveConf) - rc.fileImporter = NewFileImporter(metaClient, importCli, backend, isRawKvMode) + rc.fileImporter = NewFileImporter(metaClient, importCli, backend, isRawKvMode, rc.rewriteMode) } func (rc *Client) SetRawKVClient(c *RawKVBatchClient) { @@ -870,7 +881,7 @@ func (rc *Client) createTablesInWorkerPool(ctx context.Context, dom *domain.Doma } }) if err != nil { - log.Error("create tables fail") + log.Error("create tables fail", zap.Error(err)) return err } for _, ct := range cts { @@ -1042,7 +1053,7 @@ func MockCallSetSpeedLimit(ctx context.Context, fakeImportClient ImporterClient, rc.SetRateLimit(42) rc.SetConcurrency(concurrency) rc.hasSpeedLimited = false - rc.fileImporter = NewFileImporter(nil, fakeImportClient, nil, false) + rc.fileImporter = NewFileImporter(nil, fakeImportClient, nil, false, RewriteModeLegacy) return rc.setSpeedLimit(ctx, rc.rateLimit) } @@ -1182,7 +1193,7 @@ func (rc *Client) RestoreSSTFiles( zap.Duration("take", time.Since(fileStart))) updateCh.Inc() }() - return rc.fileImporter.ImportSSTFiles(ectx, filesReplica, rewriteRules, rc.cipher, rc.backupMeta.ApiVersion) + return rc.fileImporter.ImportSSTFiles(ectx, filesReplica, rewriteRules, rc.cipher, rc.dom.Store().GetCodec().GetAPIVersion()) }) } @@ -1434,6 +1445,8 @@ func (rc *Client) execChecksum( exe, err := checksum.NewExecutorBuilder(tbl.Table, startTS). SetOldTable(tbl.OldTable). SetConcurrency(concurrency). + SetOldKeyspace(tbl.RewriteRule.OldKeyspace). + SetNewKeyspace(tbl.RewriteRule.NewKeyspace). Build() if err != nil { return errors.Trace(err) @@ -2762,6 +2775,10 @@ func TidyOldSchemas(sr *stream.SchemasReplace) *backup.Schemas { return schemas } +func CheckKeyspaceBREnable(ctx context.Context, pdClient pd.Client) error { + return version.CheckClusterVersion(ctx, pdClient, version.CheckVersionForKeyspaceBR) +} + func CheckNewCollationEnable( backupNewCollationEnable string, g glue.Glue, diff --git a/br/pkg/restore/import.go b/br/pkg/restore/import.go index 34da5cbea2404..dd6ae97cf79c1 100644 --- a/br/pkg/restore/import.go +++ b/br/pkg/restore/import.go @@ -48,6 +48,17 @@ const ( gRPCBackOffMaxDelay = 3 * time.Second ) +// RewriteMode is a mode flag that tells the TiKV how to handle the rewrite rules. +type RewriteMode int + +const ( + // RewriteModeLegacy means no rewrite rule is applied. + RewriteModeLegacy RewriteMode = iota + + // RewriteModeKeyspace means the rewrite rule could be applied to keyspace. + RewriteModeKeyspace +) + // ImporterClient is used to import a file to TiKV. type ImporterClient interface { ClearFiles( @@ -250,6 +261,7 @@ type FileImporter struct { rawStartKey []byte rawEndKey []byte supportMultiIngest bool + rewriteMode RewriteMode cacheKey string } @@ -260,12 +272,14 @@ func NewFileImporter( importClient ImporterClient, backend *backuppb.StorageBackend, isRawKvMode bool, + rewriteMode RewriteMode, ) FileImporter { return FileImporter{ metaClient: metaClient, backend: backend, importClient: importClient, isRawKvMode: isRawKvMode, + rewriteMode: rewriteMode, cacheKey: fmt.Sprintf("BR-%s-%d", time.Now().Format("20060102150405"), rand.Int63()), } } @@ -581,7 +595,7 @@ func (importer *FileImporter) download( if importer.isRawKvMode { downloadMeta, e = importer.downloadRawKVSST(ctx, regionInfo, f, cipher, apiVersion) } else { - downloadMeta, e = importer.downloadSST(ctx, regionInfo, f, rewriteRules, cipher) + downloadMeta, e = importer.downloadSST(ctx, regionInfo, f, rewriteRules, cipher, apiVersion) } failpoint.Inject("restore-storage-error", func(val failpoint.Value) { @@ -598,7 +612,7 @@ func (importer *FileImporter) download( if importer.isRawKvMode { downloadMeta, e = importer.downloadRawKVSST(ctx, regionInfo, f, nil, apiVersion) } else { - downloadMeta, e = importer.downloadSST(ctx, regionInfo, f, rewriteRules, nil) + downloadMeta, e = importer.downloadSST(ctx, regionInfo, f, rewriteRules, nil, apiVersion) } } @@ -622,6 +636,7 @@ func (importer *FileImporter) downloadSST( file *backuppb.File, rewriteRules *RewriteRules, cipher *backuppb.CipherInfo, + apiVersion kvrpcpb.APIVersion, ) (*import_sstpb.SSTMeta, error) { uid := uuid.New() id := uid[:] @@ -630,22 +645,38 @@ func (importer *FileImporter) downloadSST( if fileRule == nil { return nil, errors.Trace(berrors.ErrKVRewriteRuleNotFound) } - rule := import_sstpb.RewriteRule{ - OldKeyPrefix: encodeKeyPrefix(fileRule.GetOldKeyPrefix()), - NewKeyPrefix: encodeKeyPrefix(fileRule.GetNewKeyPrefix()), + + // For the legacy version of TiKV, we need to encode the key prefix, since in the legacy + // version, the TiKV will rewrite the key with the encoded prefix without decoding the keys in + // the SST file. For the new version of TiKV that support keyspace rewrite, we don't need to + // encode the key prefix. The TiKV will decode the keys in the SST file and rewrite the keys + // with the plain prefix and encode the keys before writing to SST. + + // for the keyspace rewrite mode + rule := *fileRule + // for the legacy rewrite mode + if importer.rewriteMode == RewriteModeLegacy { + rule.OldKeyPrefix = encodeKeyPrefix(fileRule.GetOldKeyPrefix()) + rule.NewKeyPrefix = encodeKeyPrefix(fileRule.GetNewKeyPrefix()) + } + + sstMeta, err := GetSSTMetaFromFile(id, file, regionInfo.Region, &rule, importer.rewriteMode) + if err != nil { + return nil, err } - sstMeta := GetSSTMetaFromFile(id, file, regionInfo.Region, &rule) req := &import_sstpb.DownloadRequest{ - Sst: sstMeta, + Sst: *sstMeta, StorageBackend: importer.backend, Name: file.GetName(), RewriteRule: rule, CipherInfo: cipher, StorageCacheId: importer.cacheKey, + // For the older version of TiDB, the request type will be default to `import_sstpb.RequestType_Legacy` + RequestType: import_sstpb.DownloadRequestType_Keyspace, } log.Debug("download SST", - logutil.SSTMeta(&sstMeta), + logutil.SSTMeta(sstMeta), logutil.File(file), logutil.Region(regionInfo.Region), logutil.Leader(regionInfo.Leader), @@ -671,7 +702,7 @@ func (importer *FileImporter) downloadSST( logutil.Region(regionInfo.Region), logutil.Peer(peer), logutil.Key("resp-range-start", resp.Range.Start), - logutil.Key("resp-range-end", resp.Range.Start), + logutil.Key("resp-range-end", resp.Range.End), zap.Bool("resp-isempty", resp.IsEmpty), zap.Uint32("resp-crc32", resp.Crc32), ) @@ -686,7 +717,8 @@ func (importer *FileImporter) downloadSST( downloadResp := atomicResp.Load().(*import_sstpb.DownloadResponse) sstMeta.Range.Start = TruncateTS(downloadResp.Range.GetStart()) sstMeta.Range.End = TruncateTS(downloadResp.Range.GetEnd()) - return &sstMeta, nil + sstMeta.ApiVersion = apiVersion + return sstMeta, nil } func (importer *FileImporter) downloadRawKVSST( @@ -700,7 +732,10 @@ func (importer *FileImporter) downloadRawKVSST( id := uid[:] // Empty rule var rule import_sstpb.RewriteRule - sstMeta := GetSSTMetaFromFile(id, file, regionInfo.Region, &rule) + sstMeta, err := GetSSTMetaFromFile(id, file, regionInfo.Region, &rule, RewriteModeLegacy) + if err != nil { + return nil, err + } // Cut the SST file's range to fit in the restoring range. if bytes.Compare(importer.rawStartKey, sstMeta.Range.GetStart()) > 0 { @@ -716,7 +751,7 @@ func (importer *FileImporter) downloadRawKVSST( } req := &import_sstpb.DownloadRequest{ - Sst: sstMeta, + Sst: *sstMeta, StorageBackend: importer.backend, Name: file.GetName(), RewriteRule: rule, @@ -724,7 +759,7 @@ func (importer *FileImporter) downloadRawKVSST( CipherInfo: cipher, StorageCacheId: importer.cacheKey, } - log.Debug("download SST", logutil.SSTMeta(&sstMeta), logutil.Region(regionInfo.Region)) + log.Debug("download SST", logutil.SSTMeta(sstMeta), logutil.Region(regionInfo.Region)) var atomicResp atomic.Value eg, ectx := errgroup.WithContext(ctx) @@ -755,7 +790,7 @@ func (importer *FileImporter) downloadRawKVSST( sstMeta.Range.Start = downloadResp.Range.GetStart() sstMeta.Range.End = downloadResp.Range.GetEnd() sstMeta.ApiVersion = apiVersion - return &sstMeta, nil + return sstMeta, nil } func (importer *FileImporter) ingest( diff --git a/br/pkg/restore/range.go b/br/pkg/restore/range.go index 72a76105dd440..6f3fa76325b0c 100644 --- a/br/pkg/restore/range.go +++ b/br/pkg/restore/range.go @@ -72,7 +72,9 @@ func SortRanges(ranges []rtree.Range, rewriteRules *RewriteRules) ([]rtree.Range // RewriteRules contains rules for rewriting keys of tables. type RewriteRules struct { - Data []*import_sstpb.RewriteRule + Data []*import_sstpb.RewriteRule + OldKeyspace []byte + NewKeyspace []byte } // Append append its argument to this rewrite rules. diff --git a/br/pkg/restore/util.go b/br/pkg/restore/util.go index 73a4411c445c1..b29ef6674c74f 100644 --- a/br/pkg/restore/util.go +++ b/br/pkg/restore/util.go @@ -196,7 +196,25 @@ func GetSSTMetaFromFile( file *backuppb.File, region *metapb.Region, regionRule *import_sstpb.RewriteRule, -) import_sstpb.SSTMeta { + rewriteMode RewriteMode, +) (meta *import_sstpb.SSTMeta, err error) { + r := *region + // If the rewrite mode is for keyspace, then the region bound should be decoded. + if rewriteMode == RewriteModeKeyspace { + if len(region.GetStartKey()) > 0 { + _, r.StartKey, err = codec.DecodeBytes(region.GetStartKey(), nil) + if err != nil { + return + } + } + if len(region.GetEndKey()) > 0 { + _, r.EndKey, err = codec.DecodeBytes(region.GetEndKey(), nil) + if err != nil { + return + } + } + } + // Get the column family of the file by the file name. var cfName string if strings.Contains(file.GetName(), defaultCFName) { @@ -208,8 +226,8 @@ func GetSSTMetaFromFile( // Here we rewrites the keys to compare with the keys of the region. rangeStart := regionRule.GetNewKeyPrefix() // rangeStart = max(rangeStart, region.StartKey) - if bytes.Compare(rangeStart, region.GetStartKey()) < 0 { - rangeStart = region.GetStartKey() + if bytes.Compare(rangeStart, r.GetStartKey()) < 0 { + rangeStart = r.GetStartKey() } // Append 10 * 0xff to make sure rangeEnd cover all file key @@ -219,8 +237,8 @@ func GetSSTMetaFromFile( suffix := []byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff} rangeEnd := append(append([]byte{}, regionRule.GetNewKeyPrefix()...), suffix...) // rangeEnd = min(rangeEnd, region.EndKey) - if len(region.GetEndKey()) > 0 && bytes.Compare(rangeEnd, region.GetEndKey()) > 0 { - rangeEnd = region.GetEndKey() + if len(r.GetEndKey()) > 0 && bytes.Compare(rangeEnd, r.GetEndKey()) > 0 { + rangeEnd = r.GetEndKey() } if bytes.Compare(rangeStart, rangeEnd) > 0 { @@ -235,7 +253,7 @@ func GetSSTMetaFromFile( logutil.Key("startKey", rangeStart), logutil.Key("endKey", rangeEnd)) - return import_sstpb.SSTMeta{ + return &import_sstpb.SSTMeta{ Uuid: id, CfName: cfName, Range: &import_sstpb.Range{ @@ -246,7 +264,7 @@ func GetSSTMetaFromFile( RegionId: region.GetId(), RegionEpoch: region.GetRegionEpoch(), CipherIv: file.GetCipherIv(), - } + }, nil } // makeDBPool makes a session pool with specficated size by sessionFactory. diff --git a/br/pkg/restore/util_test.go b/br/pkg/restore/util_test.go index 482818a1ad958..4af6922efa30a 100644 --- a/br/pkg/restore/util_test.go +++ b/br/pkg/restore/util_test.go @@ -52,7 +52,8 @@ func TestGetSSTMetaFromFile(t *testing.T) { StartKey: []byte("t2abc"), EndKey: []byte("t3a"), } - sstMeta := restore.GetSSTMetaFromFile([]byte{}, file, region, rule) + sstMeta, err := restore.GetSSTMetaFromFile([]byte{}, file, region, rule, restore.RewriteModeLegacy) + require.Nil(t, err) require.Equal(t, "t2abc", string(sstMeta.GetRange().GetStart())) require.Equal(t, "t2\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff", string(sstMeta.GetRange().GetEnd())) } diff --git a/br/pkg/task/BUILD.bazel b/br/pkg/task/BUILD.bazel index 979afd1ba9110..0ad8cde87137e 100644 --- a/br/pkg/task/BUILD.bazel +++ b/br/pkg/task/BUILD.bazel @@ -67,6 +67,7 @@ go_library( "@com_github_spf13_pflag//:pflag", "@com_github_tikv_client_go_v2//config", "@com_github_tikv_client_go_v2//oracle", + "@com_github_tikv_client_go_v2//tikv", "@com_github_tikv_pd_client//:client", "@com_google_cloud_go_storage//:storage", "@io_etcd_go_etcd_client_pkg_v3//transport", diff --git a/br/pkg/task/backup.go b/br/pkg/task/backup.go index 0033324037e90..c46e8e24ab4a8 100644 --- a/br/pkg/task/backup.go +++ b/br/pkg/task/backup.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tidb/br/pkg/summary" "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/br/pkg/version" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/statistics/handle" @@ -49,6 +50,7 @@ const ( flagIgnoreStats = "ignore-stats" flagUseBackupMetaV2 = "use-backupmeta-v2" flagUseCheckpoint = "use-checkpoint" + flagKeyspaceName = "keyspace-name" flagGCTTL = "gcttl" @@ -124,6 +126,8 @@ func DefineBackupFlags(flags *pflag.FlagSet) { flags.Bool(flagUseBackupMetaV2, false, "use backup meta v2 to store meta info") + + flags.String(flagKeyspaceName, "", "keyspace name for backup") // This flag will change the structure of backupmeta. // we must make sure the old three version of br can parse the v2 meta to keep compatibility. // so this flag should set to false for three version by default. @@ -206,6 +210,10 @@ func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet) error { if err != nil { return errors.Trace(err) } + cfg.KeyspaceName, err = flags.GetString(flagKeyspaceName) + if err != nil { + return errors.Trace(err) + } if flags.Lookup(flagFullBackupType) != nil { // for backup full @@ -322,6 +330,9 @@ func isFullBackup(cmdName string) bool { // RunBackup starts a backup task inside the current goroutine. func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig) error { cfg.Adjust() + config.UpdateGlobal(func(conf *config.Config) { + conf.KeyspaceName = cfg.KeyspaceName + }) defer summary.Summary(cmdName) ctx, cancel := context.WithCancel(c) @@ -485,6 +496,11 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig if err != nil { return errors.Trace(err) } + // Add keyspace prefix to BackupRequest + for i := range ranges { + start, end := ranges[i].StartKey, ranges[i].EndKey + ranges[i].StartKey, ranges[i].EndKey = mgr.GetStorage().GetCodec().EncodeRange(start, end) + } // Metafile size should be less than 64MB. metawriter := metautil.NewMetaWriter(client.GetStorage(), @@ -498,6 +514,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig m.ClusterVersion = clusterVersion m.BrVersion = brVersion m.NewCollationsEnabled = newCollationEnable + m.ApiVersion = mgr.GetStorage().GetCodec().GetAPIVersion() }) log.Info("get placement policies", zap.Int("count", len(policies))) diff --git a/br/pkg/task/common.go b/br/pkg/task/common.go index 2d04f916d98ec..ad223b7e34c67 100644 --- a/br/pkg/task/common.go +++ b/br/pkg/task/common.go @@ -232,6 +232,9 @@ type Config struct { // whether there's explicit filter ExplicitFilter bool `json:"-" toml:"-"` + + // KeyspaceName is the name of the keyspace of the task + KeyspaceName string `json:"keyspace-name" toml:"keyspace-name"` } // DefineCommonFlags defines the flags common to all BRIE commands. diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 8a5cd0425e221..3993d2fa88543 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tidb/util/mathutil" "github.com/spf13/cobra" "github.com/spf13/pflag" + "github.com/tikv/client-go/v2/tikv" "go.uber.org/multierr" "go.uber.org/zap" ) @@ -51,6 +52,8 @@ const ( // FlagWithPlacementPolicy corresponds to tidb config with-tidb-placement-mode // current only support STRICT or IGNORE, the default is STRICT according to tidb. FlagWithPlacementPolicy = "with-tidb-placement-mode" + // FlagKeyspaceName corresponds to tidb config keyspace-name + FlagKeyspaceName = "keyspace-name" // FlagStreamStartTS and FlagStreamRestoreTS is used for log restore timestamp range. FlagStreamStartTS = "start-ts" @@ -206,6 +209,7 @@ func DefineRestoreFlags(flags *pflag.FlagSet) { // Do not expose this flag _ = flags.MarkHidden(flagNoSchema) flags.String(FlagWithPlacementPolicy, "STRICT", "correspond to tidb global/session variable with-tidb-placement-mode") + flags.String(FlagKeyspaceName, "", "correspond to tidb config keyspace-name") DefineRestoreCommonFlags(flags) } @@ -297,6 +301,10 @@ func (cfg *RestoreConfig) ParseFromFlags(flags *pflag.FlagSet) error { if err != nil { return errors.Annotatef(err, "failed to get flag %s", FlagWithPlacementPolicy) } + cfg.KeyspaceName, err = flags.GetString(FlagKeyspaceName) + if err != nil { + return errors.Annotatef(err, "failed to get flag %s", FlagKeyspaceName) + } if flags.Lookup(flagFullBackupType) != nil { // for restore full only @@ -414,7 +422,15 @@ func configureRestoreClient(ctx context.Context, client *restore.Client, cfg *Re client.SetPlacementPolicyMode(cfg.WithPlacementPolicy) client.SetWithSysTable(cfg.WithSysTable) - err := client.LoadRestoreStores(ctx) + err := restore.CheckKeyspaceBREnable(ctx, client.GetPDClient()) + if err != nil { + log.Warn("Keyspace BR is not supported in this cluster, fallback to legacy restore", zap.Error(err)) + client.SetRewriteMode(restore.RewriteModeLegacy) + } else { + client.SetRewriteMode(restore.RewriteModeKeyspace) + } + + err = client.LoadRestoreStores(ctx) if err != nil { return errors.Trace(err) } @@ -474,6 +490,9 @@ func IsStreamRestore(cmdName string) bool { // RunRestore starts a restore task inside the current goroutine. func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConfig) error { + config.UpdateGlobal(func(conf *config.Config) { + conf.KeyspaceName = cfg.KeyspaceName + }) if IsStreamRestore(cmdName) { return RunStreamRestore(c, g, cmdName, cfg) } @@ -497,6 +516,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf return errors.Trace(err) } defer mgr.Close() + codec := mgr.GetStorage().GetCodec() mergeRegionSize := cfg.MergeSmallRegionSizeBytes mergeRegionCount := cfg.MergeSmallRegionKeyCount @@ -655,10 +675,36 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf errCh := make(chan error, 32) tableStream := client.GoCreateTables(ctx, mgr.GetDomain(), tables, newTS, errCh) + if len(files) == 0 { log.Info("no files, empty databases and tables are restored") summary.SetSuccessStatus(true) // don't return immediately, wait all pipeline done. + } else { + oldKeyspace, _, err := tikv.DecodeKey(files[0].GetStartKey(), backupMeta.ApiVersion) + if err != nil { + return errors.Trace(err) + } + newKeyspace := codec.GetKeyspace() + + // If the API V2 data occurs in the restore process, the cluster must + // support the keyspace rewrite mode. + if (len(oldKeyspace) > 0 || len(newKeyspace) > 0) && client.GetRewriteMode() == restore.RewriteModeLegacy { + return errors.Annotate(berrors.ErrRestoreModeMismatch, "cluster only supports legacy rewrite mode") + } + + // Hijack the tableStream and rewrite the rewrite rules. + tableStream = util.ChanMap(tableStream, func(t restore.CreatedTable) restore.CreatedTable { + // Set the keyspace info for the checksum requests + t.RewriteRule.OldKeyspace = oldKeyspace + t.RewriteRule.NewKeyspace = newKeyspace + + for _, rule := range t.RewriteRule.Data { + rule.OldKeyPrefix = append(append([]byte{}, oldKeyspace...), rule.OldKeyPrefix...) + rule.NewKeyPrefix = codec.EncodeKey(rule.NewKeyPrefix) + } + return t + }) } if cfg.tiflashRecorder != nil { diff --git a/br/pkg/version/version.go b/br/pkg/version/version.go index 9cb974d48e13f..7bd629fb2c540 100644 --- a/br/pkg/version/version.go +++ b/br/pkg/version/version.go @@ -170,6 +170,15 @@ func CheckVersionForDDL(s *metapb.Store, tikvVersion *semver.Version) error { return nil } +// CheckVersionForKeyspaceBR checks whether the cluster is support Backup/Restore keyspace data. +func CheckVersionForKeyspaceBR(_ *metapb.Store, tikvVersion *semver.Version) error { + requireVersion := semver.New("6.6.0-alpha") + if tikvVersion.Compare(*requireVersion) < 0 { + return errors.Errorf("detected the old version of tidb cluster, require: >= 6.6.0, but got %s", tikvVersion.String()) + } + return nil +} + // CheckVersionForBR checks whether version of the cluster and BR itself is compatible. func CheckVersionForBR(s *metapb.Store, tikvVersion *semver.Version) error { BRVersion, err := semver.NewVersion(removeVAndHash(build.ReleaseVersion)) diff --git a/tablecodec/BUILD.bazel b/tablecodec/BUILD.bazel index 5752a60ddfdd1..db89b945b183d 100644 --- a/tablecodec/BUILD.bazel +++ b/tablecodec/BUILD.bazel @@ -21,6 +21,8 @@ go_library( "//util/rowcodec", "//util/stringutil", "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_kvproto//pkg/kvrpcpb", + "@com_github_tikv_client_go_v2//tikv", ], ) diff --git a/tablecodec/tablecodec.go b/tablecodec/tablecodec.go index b3726a414fa26..249a4f35495a0 100644 --- a/tablecodec/tablecodec.go +++ b/tablecodec/tablecodec.go @@ -23,6 +23,7 @@ import ( "unicode/utf8" "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/charset" @@ -37,6 +38,7 @@ import ( "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/rowcodec" "github.com/pingcap/tidb/util/stringutil" + "github.com/tikv/client-go/v2/tikv" ) var ( @@ -276,6 +278,11 @@ func DecodeKeyHead(key kv.Key) (tableID int64, indexID int64, isRecordKey bool, // DecodeTableID decodes the table ID of the key, if the key is not table key, returns 0. func DecodeTableID(key kv.Key) int64 { + // If the key is in API V2, then ignore the prefix + _, k, err := tikv.DecodeKey(key, kvrpcpb.APIVersion_V2) + if err == nil { + key = k + } if !key.HasPrefix(tablePrefix) { return 0 } diff --git a/tests/realtikvtest/testkit.go b/tests/realtikvtest/testkit.go index 4b8a749e65c9d..8b05e1518e3ff 100644 --- a/tests/realtikvtest/testkit.go +++ b/tests/realtikvtest/testkit.go @@ -41,8 +41,17 @@ import ( "go.uber.org/goleak" ) -// WithRealTiKV is a flag identify whether tests run with real TiKV -var WithRealTiKV = flag.Bool("with-real-tikv", false, "whether tests run with real TiKV") +var ( + // WithRealTiKV is a flag identify whether tests run with real TiKV + WithRealTiKV = flag.Bool("with-real-tikv", false, "whether tests run with real TiKV") + + // TiKVPath is the path of the TiKV Storage. + TiKVPath = flag.String("tikv-path", "tikv://127.0.0.1:2379?disableGC=true", "TiKV addr") + + // KeyspaceName is an option to specify the name of keyspace that the tests run on, + // this option is only valid while the flag WithRealTiKV is set. + KeyspaceName = flag.String("keyspace-name", "", "the name of keyspace that the tests run on") +) // RunTestMain run common setups for all real tikv tests. func RunTestMain(m *testing.M) { @@ -98,8 +107,9 @@ func CreateMockStoreAndDomainAndSetup(t *testing.T, opts ...mockstore.MockTiKVSt var d driver.TiKVDriver config.UpdateGlobal(func(conf *config.Config) { conf.TxnLocalLatches.Enabled = false + conf.KeyspaceName = *KeyspaceName }) - store, err = d.Open("tikv://127.0.0.1:2379?disableGC=true") + store, err = d.Open(*TiKVPath) require.NoError(t, err) dom, err = session.BootstrapSession(store)