Skip to content

Commit

Permalink
storage/: refactor storage.ExternalStorage interface (pingcap#676) (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jul 15, 2021
1 parent b936674 commit 8afeee8
Show file tree
Hide file tree
Showing 22 changed files with 404 additions and 193 deletions.
8 changes: 4 additions & 4 deletions cmd/br/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func newCheckSumCommand() *cobra.Command {
)

var data []byte
data, err = s.Read(ctx, file.Name)
data, err = s.ReadFile(ctx, file.Name)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -273,7 +273,7 @@ func decodeBackupMetaCommand() *cobra.Command {
if err != nil {
return errors.Trace(err)
}
err = s.Write(ctx, utils.MetaJSONFile, backupMetaJSON)
err = s.WriteFile(ctx, utils.MetaJSONFile, backupMetaJSON)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -327,7 +327,7 @@ func encodeBackupMetaCommand() *cobra.Command {
return errors.Trace(err)
}

metaData, err := s.Read(ctx, utils.MetaJSONFile)
metaData, err := s.ReadFile(ctx, utils.MetaJSONFile)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -347,7 +347,7 @@ func encodeBackupMetaCommand() *cobra.Command {
// Do not overwrite origin meta file
fileName += "_from_json"
}
err = s.Write(ctx, fileName, backupMeta)
err = s.WriteFile(ctx, fileName, backupMeta)
if err != nil {
return errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions go.mod1
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ require (
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce
github.com/pingcap/kvproto v0.0.0-20210308075244-560097d1309b
github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8
github.com/pingcap/parser v0.0.0-20210303062609-d1d977c9ceed
github.com/pingcap/tidb v1.1.0-beta.0.20210308110454-a7199ff91648
github.com/pingcap/parser v0.0.0-20210421190550-451a84cf120a
github.com/pingcap/tidb v1.1.0-beta.0.20210714111333-67b641d5036c
github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible
github.com/pingcap/tipb v0.0.0-20200618092958-4fad48b4c8c3
github.com/prometheus/client_golang v1.5.1
Expand Down
8 changes: 4 additions & 4 deletions go.sum1
Original file line number Diff line number Diff line change
Expand Up @@ -433,13 +433,13 @@ github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd/go.mod h1:4rbK1p9ILyIf
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8 h1:M+DNpOu/I3uDmwee6vcnoPd6GgSMqND4gxvDQ/W584U=
github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/parser v0.0.0-20210303062609-d1d977c9ceed h1:+ENLMPNRG8+/YGNJChC5QRgfrcmFnsrHl9WoVLXRZok=
github.com/pingcap/parser v0.0.0-20210303062609-d1d977c9ceed/go.mod h1:GbEr2PgY72/4XqPZzmzstlOU/+il/wrjeTNFs6ihsSE=
github.com/pingcap/parser v0.0.0-20210421190550-451a84cf120a h1:przqXMJBDMD2tHLRyFTCsHkfcme4l1m43bEb7tQOi7o=
github.com/pingcap/parser v0.0.0-20210421190550-451a84cf120a/go.mod h1:GbEr2PgY72/4XqPZzmzstlOU/+il/wrjeTNFs6ihsSE=
github.com/pingcap/sysutil v0.0.0-20200206130906-2bfa6dc40bcd/go.mod h1:EB/852NMQ+aRKioCpToQ94Wl7fktV+FNnxf3CX/TTXI=
github.com/pingcap/sysutil v0.0.0-20201130064824-f0c8aa6a6966 h1:JI0wOAb8aQML0vAVLHcxTEEC0VIwrk6gtw3WjbHvJLA=
github.com/pingcap/sysutil v0.0.0-20201130064824-f0c8aa6a6966/go.mod h1:EB/852NMQ+aRKioCpToQ94Wl7fktV+FNnxf3CX/TTXI=
github.com/pingcap/tidb v1.1.0-beta.0.20210308110454-a7199ff91648 h1:O/XlgK69L7EHkyKck32bCt17plzZnPo9J+Vp7QNe8Ww=
github.com/pingcap/tidb v1.1.0-beta.0.20210308110454-a7199ff91648/go.mod h1:MCJOHX8pPlU9mMQfyTko9EDg1mDwohW0+fvBE01WUC0=
github.com/pingcap/tidb v1.1.0-beta.0.20210714111333-67b641d5036c h1:5AQ7QTwExijm6OayuSfMqCf2TICZGGpeewazNQHW4k4=
github.com/pingcap/tidb v1.1.0-beta.0.20210714111333-67b641d5036c/go.mod h1:VyOYIKGCJVSUScBc5MIgh/FMfQ7nIfTG739vTGBqFWU=
github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible h1:ceznmu/lLseGHP/jKyOa/3u/5H3wtLLLqkH2V3ssSjg=
github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tipb v0.0.0-20200618092958-4fad48b4c8c3 h1:ESL3eIt1kUt8IMvR1011ejZlAyDcOzw89ARvVHvpD5k=
Expand Down
4 changes: 2 additions & 2 deletions pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (bc *Client) GetTS(ctx context.Context, duration time.Duration, ts uint64)

// SetLockFile set write lock file.
func (bc *Client) SetLockFile(ctx context.Context) error {
return bc.storage.Write(ctx, utils.LockFile,
return bc.storage.WriteFile(ctx, utils.LockFile,
[]byte("DO NOT DELETE\n"+
"This file exists to remind other backup jobs won't use this path"))
}
Expand Down Expand Up @@ -230,7 +230,7 @@ func (bc *Client) SaveBackupMeta(ctx context.Context, backupMeta *backuppb.Backu
}
time.Sleep(3 * time.Second)
})
return bc.storage.Write(ctx, utils.MetaFile, backupMetaData)
return bc.storage.WriteFile(ctx, utils.MetaFile, backupMetaData)
}

// GetClusterID returns the cluster ID of the tidb cluster to backup.
Expand Down
13 changes: 8 additions & 5 deletions pkg/cdclog/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func NewEventPuller(
if len(ddlFiles) == 0 {
log.Info("There is no ddl file to restore")
} else {
data, err := storage.Read(ctx, ddlFiles[0])
data, err := storage.ReadFile(ctx, ddlFiles[0])
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -73,7 +73,7 @@ func NewEventPuller(
if len(rowChangedFiles) == 0 {
log.Info("There is no row changed file to restore")
} else {
data, err := storage.Read(ctx, rowChangedFiles[0])
data, err := storage.ReadFile(ctx, rowChangedFiles[0])
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -105,13 +105,16 @@ func NewEventPuller(
// PullOneEvent pulls one event in ts order.
// The Next event which can be DDL item or Row changed Item depends on next commit ts.
func (e *EventPuller) PullOneEvent(ctx context.Context) (*SortItem, error) {
var err error
var (
data []byte
err error
)
// ddl exists
if e.ddlDecoder != nil {
// current file end, read next file if next file exists
if !e.ddlDecoder.HasNext() && e.ddlFileIndex < len(e.ddlFiles) {
path := e.ddlFiles[e.ddlFileIndex]
data, err := e.storage.Read(ctx, path)
data, err = e.storage.ReadFile(ctx, path)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -136,7 +139,7 @@ func (e *EventPuller) PullOneEvent(ctx context.Context) (*SortItem, error) {
// current file end, read next file if next file exists
if !e.rowChangedDecoder.HasNext() && e.rowChangedFileIndex < len(e.rowChangedFiles) {
path := e.rowChangedFiles[e.rowChangedFileIndex]
data, err := e.storage.Read(ctx, path)
data, err = e.storage.ReadFile(ctx, path)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/lightning/mydump/parquet_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func OpenParquetReader(
size int64,
) (source.ParquetFile, error) {
if size <= smallParquetFileThreshold {
fileBytes, err := store.Read(ctx, path)
fileBytes, err := store.ReadFile(ctx, path)
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/lightning/restore/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1333,7 +1333,7 @@ func (s *restoreSchemaSuite) SetUpSuite(c *C) {
fakeDBName := "fakedb"
// please follow the `mydump.defaultFileRouteRules`, matches files like '{schema}-schema-create.sql'
fakeFileName := fmt.Sprintf("%s-schema-create.sql", fakeDBName)
err = store.Write(ctx, fakeFileName, []byte(fmt.Sprintf("CREATE DATABASE %s;", fakeDBName)))
err = store.WriteFile(ctx, fakeFileName, []byte(fmt.Sprintf("CREATE DATABASE %s;", fakeDBName)))
c.Assert(err, IsNil)
// restore table schema files
fakeTableFilesCount := 8
Expand All @@ -1349,7 +1349,7 @@ func (s *restoreSchemaSuite) SetUpSuite(c *C) {
fakeFileName := fmt.Sprintf("%s.%s-schema.sql", fakeDBName, fakeTableName)

fakeFileContent := fmt.Sprintf("CREATE TABLE %s(i TINYINT);", fakeTableName)
err = store.Write(ctx, fakeFileName, []byte(fakeFileContent))
err = store.WriteFile(ctx, fakeFileName, []byte(fakeFileContent))
c.Assert(err, IsNil)

node, err := p.ParseOneStmt(fakeFileContent, "", "")
Expand All @@ -1367,7 +1367,7 @@ func (s *restoreSchemaSuite) SetUpSuite(c *C) {
// please follow the `mydump.defaultFileRouteRules`, matches files like '{schema}.{table}-schema-view.sql'
fakeFileName := fmt.Sprintf("%s.%s-schema-view.sql", fakeDBName, fakeViewName)
fakeFileContent := []byte(fmt.Sprintf("CREATE ALGORITHM=UNDEFINED VIEW `%s` (`i`) AS SELECT `i` FROM `%s`.`%s`;", fakeViewName, fakeDBName, fmt.Sprintf("tbl%d", i)))
err = store.Write(ctx, fakeFileName, fakeFileContent)
err = store.WriteFile(ctx, fakeFileName, fakeFileContent)
c.Assert(err, IsNil)
}
config := config.NewConfig()
Expand Down
45 changes: 23 additions & 22 deletions pkg/mock/storage.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions pkg/restore/log_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func (l *LogClient) doDBDDLJob(ctx context.Context, ddls []string) error {
}

for _, path := range ddls {
data, err := l.restoreClient.storage.Read(ctx, path)
data, err := l.restoreClient.storage.ReadFile(ctx, path)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -572,7 +572,7 @@ func (l *LogClient) doWriteAndIngest(ctx context.Context, kvs kv.Pairs, region *
if !needRetry {
log.Warn("ingest failed noretry", zap.Error(errIngest), logutil.SSTMeta(meta),
logutil.Region(region.Region), zap.Any("leader", region.Leader))
// met non-retryable error retry whole Write procedure
// met non-retryable error retry whole WriteFile procedure
return errIngest
}
// retry with not leader and epoch not match error
Expand Down Expand Up @@ -787,7 +787,7 @@ func (l *LogClient) restoreTableFromPuller(
if item == nil {
log.Info("[restoreFromPuller] nothing in this puller, we should stop and flush",
zap.Int64("table id", tableID))
err := l.applyKVChanges(ctx, tableID)
err = l.applyKVChanges(ctx, tableID)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -808,7 +808,7 @@ func (l *LogClient) restoreTableFromPuller(
zap.Uint64("end ts", l.endTS),
zap.Uint64("item ts", item.TS),
zap.Int64("table id", tableID))
err := l.applyKVChanges(ctx, tableID)
err = l.applyKVChanges(ctx, tableID)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -819,7 +819,7 @@ func (l *LogClient) restoreTableFromPuller(
log.Debug("[restoreFromPuller] filter item because later drop schema will affect on this item",
zap.Any("item", item),
zap.Int64("table id", tableID))
err := l.applyKVChanges(ctx, tableID)
err = l.applyKVChanges(ctx, tableID)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -931,7 +931,7 @@ func (l *LogClient) RestoreLogData(ctx context.Context, dom *domain.Domain) erro
// 3. Encode and ingest data to tikv

// parse meta file
data, err := l.restoreClient.storage.Read(ctx, metaFile)
data, err := l.restoreClient.storage.ReadFile(ctx, metaFile)
if err != nil {
return errors.Trace(err)
}
Expand Down
Loading

0 comments on commit 8afeee8

Please sign in to comment.