From 8608f42dccb333b92c79ae9c54c2284a96e6b13f Mon Sep 17 00:00:00 2001 From: Leavrth Date: Tue, 6 Dec 2022 11:43:12 +0800 Subject: [PATCH] add retry for azblob read file Signed-off-by: Leavrth --- br/cmd/br/cmd.go | 3 +- br/cmd/br/debug.go | 4 +- br/pkg/backup/client.go | 2 +- br/pkg/backup/schema.go | 8 ++-- br/pkg/backup/schema_test.go | 3 +- br/pkg/{utils => metautil}/schema.go | 13 +++-- br/pkg/{utils => metautil}/schema_test.go | 23 +++++---- br/pkg/restore/client.go | 26 +++++----- br/pkg/restore/client_test.go | 3 +- br/pkg/restore/db.go | 21 ++++----- br/pkg/restore/systable_restore.go | 22 ++++----- br/pkg/restore/tiflashrec/tiflash_recorder.go | 4 +- br/pkg/restore/util.go | 6 +-- br/pkg/storage/azblob.go | 22 +++++++-- br/pkg/task/common.go | 5 +- br/pkg/task/restore.go | 10 ++-- br/pkg/task/restore_test.go | 47 +++++++++---------- br/pkg/task/stream.go | 8 ++-- 18 files changed, 120 insertions(+), 110 deletions(-) rename br/pkg/{utils => metautil}/schema.go (90%) rename br/pkg/{utils => metautil}/schema_test.go (94%) diff --git a/br/cmd/br/cmd.go b/br/cmd/br/cmd.go index f3aeb3393df52..bbcce18fffb6c 100644 --- a/br/cmd/br/cmd.go +++ b/br/cmd/br/cmd.go @@ -14,6 +14,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/gluetidb" + "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/br/pkg/redact" "github.com/pingcap/tidb/br/pkg/summary" "github.com/pingcap/tidb/br/pkg/task" @@ -34,7 +35,7 @@ var ( filterOutSysAndMemTables = []string{ "*.*", - fmt.Sprintf("!%s.*", utils.TemporaryDBName("*")), + fmt.Sprintf("!%s.*", metautil.TemporaryDBName("*")), "!mysql.*", "mysql.user", "mysql.db", diff --git a/br/cmd/br/debug.go b/br/cmd/br/debug.go index c62dd677f9a66..179bebdd17eee 100644 --- a/br/cmd/br/debug.go +++ b/br/cmd/br/debug.go @@ -80,7 +80,7 @@ func newCheckSumCommand() *cobra.Command { } reader := metautil.NewMetaReader(backupMeta, s, &cfg.CipherInfo) - dbs, err := utils.LoadBackupTables(ctx, reader) + dbs, err := metautil.LoadBackupTables(ctx, reader) if err != nil { return errors.Trace(err) } @@ -182,7 +182,7 @@ func newBackupMetaValidateCommand() *cobra.Command { return errors.Trace(err) } reader := metautil.NewMetaReader(backupMeta, s, &cfg.CipherInfo) - dbs, err := utils.LoadBackupTables(ctx, reader) + dbs, err := metautil.LoadBackupTables(ctx, reader) if err != nil { log.Error("load tables failed", zap.Error(err)) return errors.Trace(err) diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index 0241789e65103..26310aa0c7db6 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -570,7 +570,7 @@ func BuildBackupRangeAndSchema( switch { case tableInfo.IsSequence(): globalAutoID, err = seqAlloc.NextGlobalAutoID() - case tableInfo.IsView() || !utils.NeedAutoID(tableInfo): + case tableInfo.IsView() || !metautil.NeedAutoID(tableInfo): // no auto ID for views or table without either rowID nor auto_increment ID. default: globalAutoID, err = idAlloc.NextGlobalAutoID() diff --git a/br/pkg/backup/schema.go b/br/pkg/backup/schema.go index bb0cf7f884189..9425f0aca2ad6 100644 --- a/br/pkg/backup/schema.go +++ b/br/pkg/backup/schema.go @@ -65,13 +65,13 @@ func (ss *Schemas) AddSchema( dbInfo *model.DBInfo, tableInfo *model.TableInfo, ) { if tableInfo == nil { - ss.schemas[utils.EncloseName(dbInfo.Name.L)] = &schemaInfo{ + ss.schemas[metautil.EncloseName(dbInfo.Name.L)] = &schemaInfo{ dbInfo: dbInfo, } return } name := fmt.Sprintf("%s.%s", - utils.EncloseName(dbInfo.Name.L), utils.EncloseName(tableInfo.Name.L)) + metautil.EncloseName(dbInfo.Name.L), metautil.EncloseName(tableInfo.Name.L)) ss.schemas[name] = &schemaInfo{ tableInfo: tableInfo, dbInfo: dbInfo, @@ -106,8 +106,8 @@ func (ss *Schemas) BackupSchemas( schema := s // Because schema.dbInfo is a pointer that many tables point to. // Remove "add Temporary-prefix into dbName" from closure to prevent concurrent operations. - if utils.IsSysDB(schema.dbInfo.Name.L) { - schema.dbInfo.Name = utils.TemporaryDBName(schema.dbInfo.Name.O) + if metautil.IsSysDB(schema.dbInfo.Name.L) { + schema.dbInfo.Name = metautil.TemporaryDBName(schema.dbInfo.Name.O) } var checksum *checkpoint.ChecksumItem diff --git a/br/pkg/backup/schema_test.go b/br/pkg/backup/schema_test.go index 08d560bf03c25..f02b3cbd271b2 100644 --- a/br/pkg/backup/schema_test.go +++ b/br/pkg/backup/schema_test.go @@ -17,7 +17,6 @@ import ( "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/br/pkg/mock" "github.com/pingcap/tidb/br/pkg/storage" - "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/testkit" filter "github.com/pingcap/tidb/util/table-filter" @@ -314,7 +313,7 @@ func TestBackupSchemasForSystemTable(t *testing.T) { schemas2 := GetSchemasFromMeta(t, es2) require.Len(t, schemas2, systemTablesCount) for _, schema := range schemas2 { - require.Equal(t, utils.TemporaryDBName("mysql"), schema.DB.Name) + require.Equal(t, metautil.TemporaryDBName("mysql"), schema.DB.Name) require.Equal(t, true, strings.HasPrefix(schema.Info.Name.O, tablePrefix)) } } diff --git a/br/pkg/utils/schema.go b/br/pkg/metautil/schema.go similarity index 90% rename from br/pkg/utils/schema.go rename to br/pkg/metautil/schema.go index 8ceba24e140ad..d5ffedd5a2dc9 100644 --- a/br/pkg/utils/schema.go +++ b/br/pkg/metautil/schema.go @@ -1,6 +1,6 @@ // Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. -package utils +package metautil import ( "context" @@ -9,7 +9,6 @@ import ( "github.com/pingcap/errors" backuppb "github.com/pingcap/kvproto/pkg/brpb" - "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" ) @@ -27,11 +26,11 @@ func NeedAutoID(tblInfo *model.TableInfo) bool { // Database wraps the schema and tables of a database. type Database struct { Info *model.DBInfo - Tables []*metautil.Table + Tables []*Table } // GetTable returns a table of the database by name. -func (db *Database) GetTable(name string) *metautil.Table { +func (db *Database) GetTable(name string) *Table { for _, table := range db.Tables { if table.Info.Name.String() == name { return table @@ -41,8 +40,8 @@ func (db *Database) GetTable(name string) *metautil.Table { } // LoadBackupTables loads schemas from BackupMeta. -func LoadBackupTables(ctx context.Context, reader *metautil.MetaReader) (map[string]*Database, error) { - ch := make(chan *metautil.Table) +func LoadBackupTables(ctx context.Context, reader *MetaReader) (map[string]*Database, error) { + ch := make(chan *Table) errCh := make(chan error) go func() { if err := reader.ReadSchemasFiles(ctx, ch); err != nil { @@ -68,7 +67,7 @@ func LoadBackupTables(ctx context.Context, reader *metautil.MetaReader) (map[str if !ok { db = &Database{ Info: table.DB, - Tables: make([]*metautil.Table, 0), + Tables: make([]*Table, 0), } databases[dbName] = db } diff --git a/br/pkg/utils/schema_test.go b/br/pkg/metautil/schema_test.go similarity index 94% rename from br/pkg/utils/schema_test.go rename to br/pkg/metautil/schema_test.go index 95d649000d9ce..b36265da00bf9 100644 --- a/br/pkg/utils/schema_test.go +++ b/br/pkg/metautil/schema_test.go @@ -1,6 +1,6 @@ // Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. -package utils +package metautil import ( "context" @@ -11,7 +11,6 @@ import ( "github.com/golang/protobuf/proto" backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/kvproto/pkg/encryptionpb" - "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/statistics/handle" @@ -84,12 +83,12 @@ func TestLoadBackupMeta(t *testing.T) { require.NoError(t, err) ctx := context.Background() - err = store.WriteFile(ctx, metautil.MetaFile, data) + err = store.WriteFile(ctx, MetaFile, data) require.NoError(t, err) dbs, err := LoadBackupTables( ctx, - metautil.NewMetaReader( + NewMetaReader( meta, store, &backuppb.CipherInfo{ @@ -179,12 +178,12 @@ func TestLoadBackupMetaPartionTable(t *testing.T) { require.NoError(t, err) ctx := context.Background() - err = store.WriteFile(ctx, metautil.MetaFile, data) + err = store.WriteFile(ctx, MetaFile, data) require.NoError(t, err) dbs, err := LoadBackupTables( ctx, - metautil.NewMetaReader( + NewMetaReader( meta, store, &backuppb.CipherInfo{ @@ -265,12 +264,12 @@ func BenchmarkLoadBackupMeta64(b *testing.B) { require.NoError(b, err) ctx := context.Background() - err = store.WriteFile(ctx, metautil.MetaFile, data) + err = store.WriteFile(ctx, MetaFile, data) require.NoError(b, err) dbs, err := LoadBackupTables( ctx, - metautil.NewMetaReader( + NewMetaReader( meta, store, &backuppb.CipherInfo{ @@ -297,12 +296,12 @@ func BenchmarkLoadBackupMeta1024(b *testing.B) { require.NoError(b, err) ctx := context.Background() - err = store.WriteFile(ctx, metautil.MetaFile, data) + err = store.WriteFile(ctx, MetaFile, data) require.NoError(b, err) dbs, err := LoadBackupTables( ctx, - metautil.NewMetaReader( + NewMetaReader( meta, store, &backuppb.CipherInfo{ @@ -329,12 +328,12 @@ func BenchmarkLoadBackupMeta10240(b *testing.B) { require.NoError(b, err) ctx := context.Background() - err = store.WriteFile(ctx, metautil.MetaFile, data) + err = store.WriteFile(ctx, MetaFile, data) require.NoError(b, err) dbs, err := LoadBackupTables( ctx, - metautil.NewMetaReader( + NewMetaReader( meta, store, &backuppb.CipherInfo{ diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 9e4e5a389b935..c8c36dd9c54f2 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -89,7 +89,7 @@ type Client struct { tlsConf *tls.Config keepaliveConf keepalive.ClientParameters - databases map[string]*utils.Database + databases map[string]*metautil.Database ddlJobs []*model.Job // store tables need to rebase info like auto id and random id and so on after create table @@ -358,7 +358,7 @@ func (rc *Client) InitBackupMeta( backend *backuppb.StorageBackend, reader *metautil.MetaReader) error { if !backupMeta.IsRawKv { - databases, err := utils.LoadBackupTables(c, reader) + databases, err := metautil.LoadBackupTables(c, reader) if err != nil { return errors.Trace(err) } @@ -525,8 +525,8 @@ func (rc *Client) GetPlacementRules(ctx context.Context, pdAddrs []string) ([]pd } // GetDatabases returns all databases. -func (rc *Client) GetDatabases() []*utils.Database { - dbs := make([]*utils.Database, 0, len(rc.databases)) +func (rc *Client) GetDatabases() []*metautil.Database { + dbs := make([]*metautil.Database, 0, len(rc.databases)) for _, db := range rc.databases { dbs = append(dbs, db) } @@ -534,14 +534,14 @@ func (rc *Client) GetDatabases() []*utils.Database { } // GetDatabase returns a database by name. -func (rc *Client) GetDatabase(name string) *utils.Database { +func (rc *Client) GetDatabase(name string) *metautil.Database { return rc.databases[name] } // HasBackedUpSysDB whether we have backed up system tables // br backs system tables up since 5.1.0 func (rc *Client) HasBackedUpSysDB() bool { - temporaryDB := utils.TemporaryDBName(mysql.SystemDB) + temporaryDB := metautil.TemporaryDBName(mysql.SystemDB) _, backedUp := rc.databases[temporaryDB.O] return backedUp } @@ -927,8 +927,8 @@ func (rc *Client) CheckSysTableCompatibility(dom *domain.Domain, tables []*metau log.Info("checking target cluster system table compatibility with backed up data") privilegeTablesInBackup := make([]*metautil.Table, 0) for _, table := range tables { - decodedSysDBName, ok := utils.GetSysDBCIStrName(table.DB.Name) - if ok && utils.IsSysDB(decodedSysDBName.L) && sysPrivilegeTableMap[table.Info.Name.L] != "" { + decodedSysDBName, ok := metautil.GetSysDBCIStrName(table.DB.Name) + if ok && metautil.IsSysDB(decodedSysDBName.L) && sysPrivilegeTableMap[table.Info.Name.L] != "" { privilegeTablesInBackup = append(privilegeTablesInBackup, table) } } @@ -1769,8 +1769,8 @@ func (rc *Client) FixIndex(ctx context.Context, schema, table, index string) err } sql := fmt.Sprintf("ADMIN RECOVER INDEX %s %s;", - utils.EncloseDBAndTable(schema, table), - utils.EncloseName(index)) + metautil.EncloseDBAndTable(schema, table), + metautil.EncloseName(index)) log.Debug("Executing fix index sql.", zap.String("sql", sql)) err := rc.db.se.Execute(ctx, sql) if err != nil { @@ -1786,7 +1786,7 @@ func (rc *Client) FixIndicesOfTables( onProgress func(), ) error { for _, table := range fullBackupTables { - if name, ok := utils.GetSysDBName(table.DB.Name); utils.IsSysDB(name) && ok { + if name, ok := metautil.GetSysDBName(table.DB.Name); metautil.IsSysDB(name) && ok { // skip system table for now onProgress() continue @@ -2078,7 +2078,7 @@ func (rc *Client) InitSchemasReplaceForDDL( dbMap := make(map[stream.OldID]*stream.DBReplace) for _, t := range *tables { - name, _ := utils.GetSysDBName(t.DB.Name) + name, _ := metautil.GetSysDBName(t.DB.Name) dbName := model.NewCIStr(name) newDBInfo, exist := rc.GetDBSchema(rc.GetDomain(), dbName) if !exist { @@ -2624,7 +2624,7 @@ func (rc *Client) SetWithSysTable(withSysTable bool) { } // MockClient create a fake client used to test. -func MockClient(dbs map[string]*utils.Database) *Client { +func MockClient(dbs map[string]*metautil.Database) *Client { return &Client{databases: dbs} } diff --git a/br/pkg/restore/client_test.go b/br/pkg/restore/client_test.go index e1f12ddbf7a1d..b2012f42a56d9 100644 --- a/br/pkg/restore/client_test.go +++ b/br/pkg/restore/client_test.go @@ -24,7 +24,6 @@ import ( "github.com/pingcap/tidb/br/pkg/restore" "github.com/pingcap/tidb/br/pkg/restore/tiflashrec" "github.com/pingcap/tidb/br/pkg/stream" - "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/br/pkg/utils/iter" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" @@ -190,7 +189,7 @@ func TestCheckSysTableCompatibility(t *testing.T) { dbSchema, isExist := info.SchemaByName(model.NewCIStr(mysql.SystemDB)) require.True(t, isExist) tmpSysDB := dbSchema.Clone() - tmpSysDB.Name = utils.TemporaryDBName(mysql.SystemDB) + tmpSysDB.Name = metautil.TemporaryDBName(mysql.SystemDB) sysDB := model.NewCIStr(mysql.SystemDB) userTI, err := client.GetTableSchema(cluster.Domain, sysDB, model.NewCIStr("user")) require.NoError(t, err) diff --git a/br/pkg/restore/db.go b/br/pkg/restore/db.go index ae62162c3e890..310f76d4820cf 100644 --- a/br/pkg/restore/db.go +++ b/br/pkg/restore/db.go @@ -12,7 +12,6 @@ import ( "github.com/pingcap/tidb/br/pkg/glue" "github.com/pingcap/tidb/br/pkg/metautil" prealloctableid "github.com/pingcap/tidb/br/pkg/restore/prealloc_table_id" - "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" @@ -116,7 +115,7 @@ func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error { } if tableInfo != nil { - switchDBSQL := fmt.Sprintf("use %s;", utils.EncloseName(ddlJob.SchemaName)) + switchDBSQL := fmt.Sprintf("use %s;", metautil.EncloseName(ddlJob.SchemaName)) err = db.se.Execute(ctx, switchDBSQL) if err != nil { log.Error("switch db failed", @@ -183,8 +182,8 @@ func (db *DB) restoreSequence(ctx context.Context, table *metautil.Table) error var err error if table.Info.IsSequence() { setValFormat := fmt.Sprintf("do setval(%s.%s, %%d);", - utils.EncloseName(table.DB.Name.O), - utils.EncloseName(table.Info.Name.O)) + metautil.EncloseName(table.DB.Name.O), + metautil.EncloseName(table.Info.Name.O)) if table.Info.Sequence.Cycle { increment := table.Info.Sequence.Increment // TiDB sequence's behaviour is designed to keep the same pace @@ -193,8 +192,8 @@ func (db *DB) restoreSequence(ctx context.Context, table *metautil.Table) error // https://github.com/pingcap/br/pull/242#issuecomment-631307978 // TODO use sql to set cycle round nextSeqSQL := fmt.Sprintf("do nextval(%s.%s);", - utils.EncloseName(table.DB.Name.O), - utils.EncloseName(table.Info.Name.O)) + metautil.EncloseName(table.DB.Name.O), + metautil.EncloseName(table.Info.Name.O)) var setValSQL string if increment < 0 { setValSQL = fmt.Sprintf(setValFormat, table.Info.Sequence.MinValue) @@ -248,17 +247,17 @@ func (db *DB) CreateTablePostRestore(ctx context.Context, table *metautil.Table, } // only table exists in restored cluster during incremental restoration should do alter after creation. case toBeCorrectedTables[UniqueTableName{table.DB.Name.String(), table.Info.Name.String()}]: - if utils.NeedAutoID(table.Info) { + if metautil.NeedAutoID(table.Info) { restoreMetaSQL = fmt.Sprintf( "alter table %s.%s auto_increment = %d;", - utils.EncloseName(table.DB.Name.O), - utils.EncloseName(table.Info.Name.O), + metautil.EncloseName(table.DB.Name.O), + metautil.EncloseName(table.Info.Name.O), table.Info.AutoIncID) } else if table.Info.PKIsHandle && table.Info.ContainsAutoRandomBits() { restoreMetaSQL = fmt.Sprintf( "alter table %s.%s auto_random_base = %d", - utils.EncloseName(table.DB.Name.O), - utils.EncloseName(table.Info.Name.O), + metautil.EncloseName(table.DB.Name.O), + metautil.EncloseName(table.Info.Name.O), table.Info.AutoRandID) } else { log.Info("table exists in incremental ddl jobs, but don't need to be altered", diff --git a/br/pkg/restore/systable_restore.go b/br/pkg/restore/systable_restore.go index 02ea0860d5425..7502fea1c365c 100644 --- a/br/pkg/restore/systable_restore.go +++ b/br/pkg/restore/systable_restore.go @@ -11,7 +11,7 @@ import ( "github.com/pingcap/log" berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/logutil" - "github.com/pingcap/tidb/br/pkg/utils" + "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" filter "github.com/pingcap/tidb/util/table-filter" @@ -76,7 +76,7 @@ func isStatsTable(tableName string) bool { func (rc *Client) RestoreSystemSchemas(ctx context.Context, f filter.Filter) { sysDB := mysql.SystemDB - temporaryDB := utils.TemporaryDBName(sysDB) + temporaryDB := metautil.TemporaryDBName(sysDB) defer rc.cleanTemporaryDatabase(ctx, sysDB) if !f.MatchSchema(sysDB) || !rc.withSysTable { @@ -133,7 +133,7 @@ func (rc *Client) getDatabaseByName(name string) (*database, bool) { db := &database{ ExistingTables: map[string]*model.TableInfo{}, Name: model.NewCIStr(name), - TemporaryName: utils.TemporaryDBName(name), + TemporaryName: metautil.TemporaryDBName(name), } for _, t := range schema.Tables { db.ExistingTables[t.Name.L] = t @@ -210,7 +210,7 @@ func (rc *Client) replaceTemporaryTableToSystable(ctx context.Context, ti *model log.Info("full cluster restore, delete existing data", zap.String("table", tableName), zap.Stringer("schema", db.Name)) deleteSQL := fmt.Sprintf("DELETE FROM %s %s;", - utils.EncloseDBAndTable(db.Name.L, tableName), whereClause) + metautil.EncloseDBAndTable(db.Name.L, tableName), whereClause) if err := execSQL(deleteSQL); err != nil { return err } @@ -221,28 +221,28 @@ func (rc *Client) replaceTemporaryTableToSystable(ctx context.Context, ti *model // target column order may different with source cluster columnNames := make([]string, 0, len(ti.Columns)) for _, col := range ti.Columns { - columnNames = append(columnNames, utils.EncloseName(col.Name.L)) + columnNames = append(columnNames, metautil.EncloseName(col.Name.L)) } colListStr := strings.Join(columnNames, ",") replaceIntoSQL := fmt.Sprintf("REPLACE INTO %s(%s) SELECT %s FROM %s %s;", - utils.EncloseDBAndTable(db.Name.L, tableName), + metautil.EncloseDBAndTable(db.Name.L, tableName), colListStr, colListStr, - utils.EncloseDBAndTable(db.TemporaryName.L, tableName), + metautil.EncloseDBAndTable(db.TemporaryName.L, tableName), whereClause) return execSQL(replaceIntoSQL) } renameSQL := fmt.Sprintf("RENAME TABLE %s TO %s;", - utils.EncloseDBAndTable(db.TemporaryName.L, tableName), - utils.EncloseDBAndTable(db.Name.L, tableName), + metautil.EncloseDBAndTable(db.TemporaryName.L, tableName), + metautil.EncloseDBAndTable(db.Name.L, tableName), ) return execSQL(renameSQL) } func (rc *Client) cleanTemporaryDatabase(ctx context.Context, originDB string) { - database := utils.TemporaryDBName(originDB) + database := metautil.TemporaryDBName(originDB) log.Debug("dropping temporary database", zap.Stringer("database", database)) - sql := fmt.Sprintf("DROP DATABASE IF EXISTS %s", utils.EncloseName(database.L)) + sql := fmt.Sprintf("DROP DATABASE IF EXISTS %s", metautil.EncloseName(database.L)) if err := rc.db.se.Execute(ctx, sql); err != nil { logutil.WarnTerm("failed to drop temporary database, it should be dropped manually", zap.Stringer("database", database), diff --git a/br/pkg/restore/tiflashrec/tiflash_recorder.go b/br/pkg/restore/tiflashrec/tiflash_recorder.go index 31dde982a7b69..4adfdb08b0de9 100644 --- a/br/pkg/restore/tiflashrec/tiflash_recorder.go +++ b/br/pkg/restore/tiflashrec/tiflash_recorder.go @@ -20,7 +20,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/logutil" - "github.com/pingcap/tidb/br/pkg/utils" + "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/format" @@ -99,7 +99,7 @@ func (r *TiFlashRecorder) GenerateAlterTableDDLs(info infoschema.InfoSchema) []s } items = append(items, fmt.Sprintf( "ALTER TABLE %s %s", - utils.EncloseDBAndTable(schema.Name.O, table.Meta().Name.O), + metautil.EncloseDBAndTable(schema.Name.O, table.Meta().Name.O), altTableSpec), ) }) diff --git a/br/pkg/restore/util.go b/br/pkg/restore/util.go index 73a4411c445c1..af6fe657adfc2 100644 --- a/br/pkg/restore/util.go +++ b/br/pkg/restore/util.go @@ -20,10 +20,10 @@ import ( berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/glue" "github.com/pingcap/tidb/br/pkg/logutil" + "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/br/pkg/redact" "github.com/pingcap/tidb/br/pkg/restore/split" "github.com/pingcap/tidb/br/pkg/rtree" - "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/codec" @@ -575,8 +575,8 @@ func ZapTables(tables []CreatedTable) zapcore.Field { names := make([]string, 0, len(tables)) for _, t := range tables { names = append(names, fmt.Sprintf("%s.%s", - utils.EncloseName(t.OldTable.DB.Name.String()), - utils.EncloseName(t.OldTable.Info.Name.String()))) + metautil.EncloseName(t.OldTable.DB.Name.String()), + metautil.EncloseName(t.OldTable.Info.Name.String()))) } return names }) diff --git a/br/pkg/storage/azblob.go b/br/pkg/storage/azblob.go index c557a79e3ac8f..2b9c33946a876 100644 --- a/br/pkg/storage/azblob.go +++ b/br/pkg/storage/azblob.go @@ -19,6 +19,7 @@ import ( backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/log" berrors "github.com/pingcap/tidb/br/pkg/errors" + "github.com/pingcap/tidb/br/pkg/utils" "github.com/spf13/pflag" "go.uber.org/zap" ) @@ -30,6 +31,8 @@ const ( azblobAccountKey = "azblob.account-key" ) +const maxRetry = 5 + // AzblobBackendOptions is the options for Azure Blob storage. type AzblobBackendOptions struct { Endpoint string `json:"endpoint" toml:"endpoint"` @@ -280,10 +283,23 @@ func (s *AzureBlobStorage) WriteFile(ctx context.Context, name string, data []by // ReadFile reads a file from Azure Blob Storage. func (s *AzureBlobStorage) ReadFile(ctx context.Context, name string) ([]byte, error) { client := s.containerClient.NewBlockBlobClient(s.withPrefix(name)) - resp, err := client.Download(ctx, nil) - if err != nil { - return nil, errors.Annotatef(err, "Failed to download azure blob file, file info: bucket(container)='%s', key='%s'", s.options.Bucket, s.withPrefix(name)) + var resp *azblob.DownloadResponse + var err error + for retryTimes := 0; ; retryTimes++ { + if retryTimes == maxRetry { + return nil, errors.Annotatef(err, "Failed to retry to download azure blob file, file info: bucket(container)='%s', key='%s'", s.options.Bucket, s.withPrefix(name)) + } + resp, err = client.Download(ctx, nil) + if err != nil { + if utils.MessageIsRetryableStorageError(err.Error()) { + log.Warn("Failed to download azure blob file, file info", zap.String("bucket(container)", s.options.Bucket), zap.String("key", s.withPrefix(name)), zap.Int("retry", retryTimes), zap.Error(err)) + continue + } + return nil, errors.Annotatef(err, "Failed to download azure blob file, file info: bucket(container)='%s', key='%s'", s.options.Bucket, s.withPrefix(name)) + } + break } + defer resp.RawResponse.Body.Close() data, err := io.ReadAll(resp.Body(azblob.RetryReaderOptions{})) if err != nil { diff --git a/br/pkg/task/common.go b/br/pkg/task/common.go index 2d04f916d98ec..7b093e19fd7b5 100644 --- a/br/pkg/task/common.go +++ b/br/pkg/task/common.go @@ -27,7 +27,6 @@ import ( "github.com/pingcap/tidb/br/pkg/glue" "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/br/pkg/storage" - "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/sessionctx/variable" filter "github.com/pingcap/tidb/util/table-filter" "github.com/spf13/cobra" @@ -506,13 +505,13 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error { if len(db) == 0 { return errors.Annotate(berrors.ErrInvalidArgument, "empty database name is not allowed") } - cfg.Schemas[utils.EncloseName(db)] = struct{}{} + cfg.Schemas[metautil.EncloseName(db)] = struct{}{} if tblFlag := flags.Lookup(flagTable); tblFlag != nil { tbl := tblFlag.Value.String() if len(tbl) == 0 { return errors.Annotate(berrors.ErrInvalidArgument, "empty table name is not allowed") } - cfg.Tables[utils.EncloseDBAndTable(db, tbl)] = struct{}{} + cfg.Tables[metautil.EncloseDBAndTable(db, tbl)] = struct{}{} cfg.TableFilter = filter.NewTablesFilter(filter.Table{ Schema: db, Name: tbl, diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 83c22a29e61db..0147f79b3ff0d 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -421,16 +421,16 @@ func CheckRestoreDBAndTable(client *restore.Client, cfg *RestoreConfig) error { tablesMap := make(map[string]struct{}) for _, db := range schemas { dbName := db.Info.Name.L - if dbCIStrName, ok := utils.GetSysDBCIStrName(db.Info.Name); utils.IsSysDB(dbCIStrName.O) && ok { + if dbCIStrName, ok := metautil.GetSysDBCIStrName(db.Info.Name); metautil.IsSysDB(dbCIStrName.O) && ok { dbName = dbCIStrName.L } - schemasMap[utils.EncloseName(dbName)] = struct{}{} + schemasMap[metautil.EncloseName(dbName)] = struct{}{} for _, table := range db.Tables { if table.Info == nil { // we may back up empty database. continue } - tablesMap[utils.EncloseDBAndTable(dbName, table.Info.Name.L)] = struct{}{} + tablesMap[metautil.EncloseDBAndTable(dbName, table.Info.Name.L)] = struct{}{} } } restoreSchemas := cfg.Schemas @@ -797,10 +797,10 @@ func dropToBlackhole( func filterRestoreFiles( client *restore.Client, cfg *RestoreConfig, -) (files []*backuppb.File, tables []*metautil.Table, dbs []*utils.Database) { +) (files []*backuppb.File, tables []*metautil.Table, dbs []*metautil.Database) { for _, db := range client.GetDatabases() { dbName := db.Info.Name.O - if name, ok := utils.GetSysDBName(db.Info.Name); utils.IsSysDB(name) && ok { + if name, ok := metautil.GetSysDBName(db.Info.Name); metautil.IsSysDB(name) && ok { dbName = name } if !cfg.TableFilter.MatchSchema(dbName) { diff --git a/br/pkg/task/restore_test.go b/br/pkg/task/restore_test.go index b13ecf0eccc08..66982a0626530 100644 --- a/br/pkg/task/restore_test.go +++ b/br/pkg/task/restore_test.go @@ -16,7 +16,6 @@ import ( "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/br/pkg/restore" "github.com/pingcap/tidb/br/pkg/storage" - "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/statistics/handle" "github.com/pingcap/tidb/tablecodec" @@ -77,15 +76,15 @@ func TestCheckRestoreDBAndTable(t *testing.T) { cases := []struct { cfgSchemas map[string]struct{} cfgTables map[string]struct{} - backupDBs map[string]*utils.Database + backupDBs map[string]*metautil.Database }{ { cfgSchemas: map[string]struct{}{ - utils.EncloseName("test"): {}, + metautil.EncloseName("test"): {}, }, cfgTables: map[string]struct{}{ - utils.EncloseDBAndTable("test", "t"): {}, - utils.EncloseDBAndTable("test", "t2"): {}, + metautil.EncloseDBAndTable("test", "t"): {}, + metautil.EncloseDBAndTable("test", "t2"): {}, }, backupDBs: mockReadSchemasFromBackupMeta(t, map[string][]string{ "test": {"T", "T2"}, @@ -93,11 +92,11 @@ func TestCheckRestoreDBAndTable(t *testing.T) { }, { cfgSchemas: map[string]struct{}{ - utils.EncloseName("mysql"): {}, + metautil.EncloseName("mysql"): {}, }, cfgTables: map[string]struct{}{ - utils.EncloseDBAndTable("mysql", "t"): {}, - utils.EncloseDBAndTable("mysql", "t2"): {}, + metautil.EncloseDBAndTable("mysql", "t"): {}, + metautil.EncloseDBAndTable("mysql", "t2"): {}, }, backupDBs: mockReadSchemasFromBackupMeta(t, map[string][]string{ "__TiDB_BR_Temporary_mysql": {"T", "T2"}, @@ -105,11 +104,11 @@ func TestCheckRestoreDBAndTable(t *testing.T) { }, { cfgSchemas: map[string]struct{}{ - utils.EncloseName("test"): {}, + metautil.EncloseName("test"): {}, }, cfgTables: map[string]struct{}{ - utils.EncloseDBAndTable("test", "T"): {}, - utils.EncloseDBAndTable("test", "T2"): {}, + metautil.EncloseDBAndTable("test", "T"): {}, + metautil.EncloseDBAndTable("test", "T2"): {}, }, backupDBs: mockReadSchemasFromBackupMeta(t, map[string][]string{ "test": {"t", "t2"}, @@ -117,11 +116,11 @@ func TestCheckRestoreDBAndTable(t *testing.T) { }, { cfgSchemas: map[string]struct{}{ - utils.EncloseName("TEST"): {}, + metautil.EncloseName("TEST"): {}, }, cfgTables: map[string]struct{}{ - utils.EncloseDBAndTable("TEST", "t"): {}, - utils.EncloseDBAndTable("TEST", "T2"): {}, + metautil.EncloseDBAndTable("TEST", "t"): {}, + metautil.EncloseDBAndTable("TEST", "T2"): {}, }, backupDBs: mockReadSchemasFromBackupMeta(t, map[string][]string{ "test": {"t", "t2"}, @@ -129,11 +128,11 @@ func TestCheckRestoreDBAndTable(t *testing.T) { }, { cfgSchemas: map[string]struct{}{ - utils.EncloseName("TeSt"): {}, + metautil.EncloseName("TeSt"): {}, }, cfgTables: map[string]struct{}{ - utils.EncloseDBAndTable("TeSt", "tabLe"): {}, - utils.EncloseDBAndTable("TeSt", "taBle2"): {}, + metautil.EncloseDBAndTable("TeSt", "tabLe"): {}, + metautil.EncloseDBAndTable("TeSt", "taBle2"): {}, }, backupDBs: mockReadSchemasFromBackupMeta(t, map[string][]string{ "TesT": {"TablE", "taBle2"}, @@ -141,13 +140,13 @@ func TestCheckRestoreDBAndTable(t *testing.T) { }, { cfgSchemas: map[string]struct{}{ - utils.EncloseName("TeSt"): {}, - utils.EncloseName("MYSQL"): {}, + metautil.EncloseName("TeSt"): {}, + metautil.EncloseName("MYSQL"): {}, }, cfgTables: map[string]struct{}{ - utils.EncloseDBAndTable("TeSt", "tabLe"): {}, - utils.EncloseDBAndTable("TeSt", "taBle2"): {}, - utils.EncloseDBAndTable("MYSQL", "taBle"): {}, + metautil.EncloseDBAndTable("TeSt", "tabLe"): {}, + metautil.EncloseDBAndTable("TeSt", "taBle2"): {}, + metautil.EncloseDBAndTable("MYSQL", "taBle"): {}, }, backupDBs: mockReadSchemasFromBackupMeta(t, map[string][]string{ "TesT": {"table", "TaBLE2"}, @@ -167,7 +166,7 @@ func TestCheckRestoreDBAndTable(t *testing.T) { } } -func mockReadSchemasFromBackupMeta(t *testing.T, db2Tables map[string][]string) map[string]*utils.Database { +func mockReadSchemasFromBackupMeta(t *testing.T, db2Tables map[string][]string) map[string]*metautil.Database { testDir := t.TempDir() store, err := storage.NewLocalStorage(testDir) require.NoError(t, err) @@ -236,7 +235,7 @@ func mockReadSchemasFromBackupMeta(t *testing.T, db2Tables map[string][]string) err = store.WriteFile(ctx, metautil.MetaFile, data) require.NoError(t, err) - dbs, err := utils.LoadBackupTables( + dbs, err := metautil.LoadBackupTables( ctx, metautil.NewMetaReader( meta, diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index fdcc728a9ce5f..e1e0e1a560bf5 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -1487,7 +1487,7 @@ func initFullBackupTables( // read full backup databases to get map[table]table.Info reader := metautil.NewMetaReader(backupMeta, s, nil) - databases, err := utils.LoadBackupTables(ctx, reader) + databases, err := metautil.LoadBackupTables(ctx, reader) if err != nil { return nil, errors.Trace(err) } @@ -1495,7 +1495,7 @@ func initFullBackupTables( tables := make(map[int64]*metautil.Table) for _, db := range databases { dbName := db.Info.Name.O - if name, ok := utils.GetSysDBName(db.Info.Name); utils.IsSysDB(name) && ok { + if name, ok := metautil.GetSysDBName(db.Info.Name); metautil.IsSysDB(name) && ok { dbName = name } @@ -1523,7 +1523,7 @@ func initRewriteRules(client *restore.Client, tables map[int64]*metautil.Table) // compare table exists in cluster and map[table]table.Info to get rewrite rules. rules := make(map[int64]*restore.RewriteRules) for _, t := range tables { - if name, ok := utils.GetSysDBName(t.DB.Name); utils.IsSysDB(name) && ok { + if name, ok := metautil.GetSysDBName(t.DB.Name); metautil.IsSysDB(name) && ok { // skip system table for now continue } @@ -1565,7 +1565,7 @@ func updateRewriteRules(rules map[int64]*restore.RewriteRules, schemasReplace *s for _, dbReplace := range schemasReplace.DbMap { if dbReplace.OldDBInfo == nil || - utils.IsSysDB(dbReplace.OldDBInfo.Name.O) || + metautil.IsSysDB(dbReplace.OldDBInfo.Name.O) || !filter.MatchSchema(dbReplace.OldDBInfo.Name.O) { continue }