Skip to content

Commit

Permalink
add retry for azblob read file
Browse files Browse the repository at this point in the history
Signed-off-by: Leavrth <jianjun.liao@outlook.com>
  • Loading branch information
Leavrth committed Dec 6, 2022
1 parent 2d30149 commit 8608f42
Show file tree
Hide file tree
Showing 18 changed files with 120 additions and 110 deletions.
3 changes: 2 additions & 1 deletion br/cmd/br/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/gluetidb"
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/redact"
"github.com/pingcap/tidb/br/pkg/summary"
"github.com/pingcap/tidb/br/pkg/task"
Expand All @@ -34,7 +35,7 @@ var (

filterOutSysAndMemTables = []string{
"*.*",
fmt.Sprintf("!%s.*", utils.TemporaryDBName("*")),
fmt.Sprintf("!%s.*", metautil.TemporaryDBName("*")),
"!mysql.*",
"mysql.user",
"mysql.db",
Expand Down
4 changes: 2 additions & 2 deletions br/cmd/br/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func newCheckSumCommand() *cobra.Command {
}

reader := metautil.NewMetaReader(backupMeta, s, &cfg.CipherInfo)
dbs, err := utils.LoadBackupTables(ctx, reader)
dbs, err := metautil.LoadBackupTables(ctx, reader)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -182,7 +182,7 @@ func newBackupMetaValidateCommand() *cobra.Command {
return errors.Trace(err)
}
reader := metautil.NewMetaReader(backupMeta, s, &cfg.CipherInfo)
dbs, err := utils.LoadBackupTables(ctx, reader)
dbs, err := metautil.LoadBackupTables(ctx, reader)
if err != nil {
log.Error("load tables failed", zap.Error(err))
return errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ func BuildBackupRangeAndSchema(
switch {
case tableInfo.IsSequence():
globalAutoID, err = seqAlloc.NextGlobalAutoID()
case tableInfo.IsView() || !utils.NeedAutoID(tableInfo):
case tableInfo.IsView() || !metautil.NeedAutoID(tableInfo):
// no auto ID for views or table without either rowID nor auto_increment ID.
default:
globalAutoID, err = idAlloc.NextGlobalAutoID()
Expand Down
8 changes: 4 additions & 4 deletions br/pkg/backup/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@ func (ss *Schemas) AddSchema(
dbInfo *model.DBInfo, tableInfo *model.TableInfo,
) {
if tableInfo == nil {
ss.schemas[utils.EncloseName(dbInfo.Name.L)] = &schemaInfo{
ss.schemas[metautil.EncloseName(dbInfo.Name.L)] = &schemaInfo{
dbInfo: dbInfo,
}
return
}
name := fmt.Sprintf("%s.%s",
utils.EncloseName(dbInfo.Name.L), utils.EncloseName(tableInfo.Name.L))
metautil.EncloseName(dbInfo.Name.L), metautil.EncloseName(tableInfo.Name.L))
ss.schemas[name] = &schemaInfo{
tableInfo: tableInfo,
dbInfo: dbInfo,
Expand Down Expand Up @@ -106,8 +106,8 @@ func (ss *Schemas) BackupSchemas(
schema := s
// Because schema.dbInfo is a pointer that many tables point to.
// Remove "add Temporary-prefix into dbName" from closure to prevent concurrent operations.
if utils.IsSysDB(schema.dbInfo.Name.L) {
schema.dbInfo.Name = utils.TemporaryDBName(schema.dbInfo.Name.O)
if metautil.IsSysDB(schema.dbInfo.Name.L) {
schema.dbInfo.Name = metautil.TemporaryDBName(schema.dbInfo.Name.O)
}

var checksum *checkpoint.ChecksumItem
Expand Down
3 changes: 1 addition & 2 deletions br/pkg/backup/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/mock"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/testkit"
filter "github.com/pingcap/tidb/util/table-filter"
Expand Down Expand Up @@ -314,7 +313,7 @@ func TestBackupSchemasForSystemTable(t *testing.T) {
schemas2 := GetSchemasFromMeta(t, es2)
require.Len(t, schemas2, systemTablesCount)
for _, schema := range schemas2 {
require.Equal(t, utils.TemporaryDBName("mysql"), schema.DB.Name)
require.Equal(t, metautil.TemporaryDBName("mysql"), schema.DB.Name)
require.Equal(t, true, strings.HasPrefix(schema.Info.Name.O, tablePrefix))
}
}
13 changes: 6 additions & 7 deletions br/pkg/utils/schema.go → br/pkg/metautil/schema.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.

package utils
package metautil

import (
"context"
Expand All @@ -9,7 +9,6 @@ import (

"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
)
Expand All @@ -27,11 +26,11 @@ func NeedAutoID(tblInfo *model.TableInfo) bool {
// Database wraps the schema and tables of a database.
type Database struct {
Info *model.DBInfo
Tables []*metautil.Table
Tables []*Table
}

// GetTable returns a table of the database by name.
func (db *Database) GetTable(name string) *metautil.Table {
func (db *Database) GetTable(name string) *Table {
for _, table := range db.Tables {
if table.Info.Name.String() == name {
return table
Expand All @@ -41,8 +40,8 @@ func (db *Database) GetTable(name string) *metautil.Table {
}

// LoadBackupTables loads schemas from BackupMeta.
func LoadBackupTables(ctx context.Context, reader *metautil.MetaReader) (map[string]*Database, error) {
ch := make(chan *metautil.Table)
func LoadBackupTables(ctx context.Context, reader *MetaReader) (map[string]*Database, error) {
ch := make(chan *Table)
errCh := make(chan error)
go func() {
if err := reader.ReadSchemasFiles(ctx, ch); err != nil {
Expand All @@ -68,7 +67,7 @@ func LoadBackupTables(ctx context.Context, reader *metautil.MetaReader) (map[str
if !ok {
db = &Database{
Info: table.DB,
Tables: make([]*metautil.Table, 0),
Tables: make([]*Table, 0),
}
databases[dbName] = db
}
Expand Down
23 changes: 11 additions & 12 deletions br/pkg/utils/schema_test.go → br/pkg/metautil/schema_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.

package utils
package metautil

import (
"context"
Expand All @@ -11,7 +11,6 @@ import (
"github.com/golang/protobuf/proto"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/encryptionpb"
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/statistics/handle"
Expand Down Expand Up @@ -84,12 +83,12 @@ func TestLoadBackupMeta(t *testing.T) {
require.NoError(t, err)

ctx := context.Background()
err = store.WriteFile(ctx, metautil.MetaFile, data)
err = store.WriteFile(ctx, MetaFile, data)
require.NoError(t, err)

dbs, err := LoadBackupTables(
ctx,
metautil.NewMetaReader(
NewMetaReader(
meta,
store,
&backuppb.CipherInfo{
Expand Down Expand Up @@ -179,12 +178,12 @@ func TestLoadBackupMetaPartionTable(t *testing.T) {
require.NoError(t, err)

ctx := context.Background()
err = store.WriteFile(ctx, metautil.MetaFile, data)
err = store.WriteFile(ctx, MetaFile, data)
require.NoError(t, err)

dbs, err := LoadBackupTables(
ctx,
metautil.NewMetaReader(
NewMetaReader(
meta,
store,
&backuppb.CipherInfo{
Expand Down Expand Up @@ -265,12 +264,12 @@ func BenchmarkLoadBackupMeta64(b *testing.B) {
require.NoError(b, err)

ctx := context.Background()
err = store.WriteFile(ctx, metautil.MetaFile, data)
err = store.WriteFile(ctx, MetaFile, data)
require.NoError(b, err)

dbs, err := LoadBackupTables(
ctx,
metautil.NewMetaReader(
NewMetaReader(
meta,
store,
&backuppb.CipherInfo{
Expand All @@ -297,12 +296,12 @@ func BenchmarkLoadBackupMeta1024(b *testing.B) {
require.NoError(b, err)

ctx := context.Background()
err = store.WriteFile(ctx, metautil.MetaFile, data)
err = store.WriteFile(ctx, MetaFile, data)
require.NoError(b, err)

dbs, err := LoadBackupTables(
ctx,
metautil.NewMetaReader(
NewMetaReader(
meta,
store,
&backuppb.CipherInfo{
Expand All @@ -329,12 +328,12 @@ func BenchmarkLoadBackupMeta10240(b *testing.B) {
require.NoError(b, err)

ctx := context.Background()
err = store.WriteFile(ctx, metautil.MetaFile, data)
err = store.WriteFile(ctx, MetaFile, data)
require.NoError(b, err)

dbs, err := LoadBackupTables(
ctx,
metautil.NewMetaReader(
NewMetaReader(
meta,
store,
&backuppb.CipherInfo{
Expand Down
26 changes: 13 additions & 13 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ type Client struct {
tlsConf *tls.Config
keepaliveConf keepalive.ClientParameters

databases map[string]*utils.Database
databases map[string]*metautil.Database
ddlJobs []*model.Job

// store tables need to rebase info like auto id and random id and so on after create table
Expand Down Expand Up @@ -358,7 +358,7 @@ func (rc *Client) InitBackupMeta(
backend *backuppb.StorageBackend,
reader *metautil.MetaReader) error {
if !backupMeta.IsRawKv {
databases, err := utils.LoadBackupTables(c, reader)
databases, err := metautil.LoadBackupTables(c, reader)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -525,23 +525,23 @@ func (rc *Client) GetPlacementRules(ctx context.Context, pdAddrs []string) ([]pd
}

// GetDatabases returns all databases.
func (rc *Client) GetDatabases() []*utils.Database {
dbs := make([]*utils.Database, 0, len(rc.databases))
func (rc *Client) GetDatabases() []*metautil.Database {
dbs := make([]*metautil.Database, 0, len(rc.databases))
for _, db := range rc.databases {
dbs = append(dbs, db)
}
return dbs
}

// GetDatabase returns a database by name.
func (rc *Client) GetDatabase(name string) *utils.Database {
func (rc *Client) GetDatabase(name string) *metautil.Database {
return rc.databases[name]
}

// HasBackedUpSysDB whether we have backed up system tables
// br backs system tables up since 5.1.0
func (rc *Client) HasBackedUpSysDB() bool {
temporaryDB := utils.TemporaryDBName(mysql.SystemDB)
temporaryDB := metautil.TemporaryDBName(mysql.SystemDB)
_, backedUp := rc.databases[temporaryDB.O]
return backedUp
}
Expand Down Expand Up @@ -927,8 +927,8 @@ func (rc *Client) CheckSysTableCompatibility(dom *domain.Domain, tables []*metau
log.Info("checking target cluster system table compatibility with backed up data")
privilegeTablesInBackup := make([]*metautil.Table, 0)
for _, table := range tables {
decodedSysDBName, ok := utils.GetSysDBCIStrName(table.DB.Name)
if ok && utils.IsSysDB(decodedSysDBName.L) && sysPrivilegeTableMap[table.Info.Name.L] != "" {
decodedSysDBName, ok := metautil.GetSysDBCIStrName(table.DB.Name)
if ok && metautil.IsSysDB(decodedSysDBName.L) && sysPrivilegeTableMap[table.Info.Name.L] != "" {
privilegeTablesInBackup = append(privilegeTablesInBackup, table)
}
}
Expand Down Expand Up @@ -1769,8 +1769,8 @@ func (rc *Client) FixIndex(ctx context.Context, schema, table, index string) err
}

sql := fmt.Sprintf("ADMIN RECOVER INDEX %s %s;",
utils.EncloseDBAndTable(schema, table),
utils.EncloseName(index))
metautil.EncloseDBAndTable(schema, table),
metautil.EncloseName(index))
log.Debug("Executing fix index sql.", zap.String("sql", sql))
err := rc.db.se.Execute(ctx, sql)
if err != nil {
Expand All @@ -1786,7 +1786,7 @@ func (rc *Client) FixIndicesOfTables(
onProgress func(),
) error {
for _, table := range fullBackupTables {
if name, ok := utils.GetSysDBName(table.DB.Name); utils.IsSysDB(name) && ok {
if name, ok := metautil.GetSysDBName(table.DB.Name); metautil.IsSysDB(name) && ok {
// skip system table for now
onProgress()
continue
Expand Down Expand Up @@ -2078,7 +2078,7 @@ func (rc *Client) InitSchemasReplaceForDDL(
dbMap := make(map[stream.OldID]*stream.DBReplace)

for _, t := range *tables {
name, _ := utils.GetSysDBName(t.DB.Name)
name, _ := metautil.GetSysDBName(t.DB.Name)
dbName := model.NewCIStr(name)
newDBInfo, exist := rc.GetDBSchema(rc.GetDomain(), dbName)
if !exist {
Expand Down Expand Up @@ -2624,7 +2624,7 @@ func (rc *Client) SetWithSysTable(withSysTable bool) {
}

// MockClient create a fake client used to test.
func MockClient(dbs map[string]*utils.Database) *Client {
func MockClient(dbs map[string]*metautil.Database) *Client {
return &Client{databases: dbs}
}

Expand Down
3 changes: 1 addition & 2 deletions br/pkg/restore/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/pingcap/tidb/br/pkg/restore"
"github.com/pingcap/tidb/br/pkg/restore/tiflashrec"
"github.com/pingcap/tidb/br/pkg/stream"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/utils/iter"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
Expand Down Expand Up @@ -190,7 +189,7 @@ func TestCheckSysTableCompatibility(t *testing.T) {
dbSchema, isExist := info.SchemaByName(model.NewCIStr(mysql.SystemDB))
require.True(t, isExist)
tmpSysDB := dbSchema.Clone()
tmpSysDB.Name = utils.TemporaryDBName(mysql.SystemDB)
tmpSysDB.Name = metautil.TemporaryDBName(mysql.SystemDB)
sysDB := model.NewCIStr(mysql.SystemDB)
userTI, err := client.GetTableSchema(cluster.Domain, sysDB, model.NewCIStr("user"))
require.NoError(t, err)
Expand Down
Loading

0 comments on commit 8608f42

Please sign in to comment.