Skip to content

Commit

Permalink
lightning: trim prefix when use gcs/s3 Storage WalkDir (#34319)
Browse files Browse the repository at this point in the history
close #34118
  • Loading branch information
Ehco1996 authored May 16, 2022
1 parent 32258fd commit 9392793
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 13 deletions.
4 changes: 2 additions & 2 deletions br/pkg/storage/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,12 +189,10 @@ func (s *gcsStorage) WalkDir(ctx context.Context, opt *WalkOption, fn func(strin
if len(opt.ObjPrefix) != 0 {
return errors.New("gcs storage not support ObjPrefix for now")
}

prefix := path.Join(s.gcs.Prefix, opt.SubDir)
if len(prefix) > 0 && !strings.HasSuffix(prefix, "/") {
prefix += "/"
}

query := &storage.Query{Prefix: prefix}
// only need each object's name and size
err := query.SetAttrSelection([]string{"Name", "Size"})
Expand All @@ -214,6 +212,8 @@ func (s *gcsStorage) WalkDir(ctx context.Context, opt *WalkOption, fn func(strin
// which can not be reuse in other API(Open/Read) directly.
// so we use TrimPrefix to filter Prefix for next Open/Read.
path := strings.TrimPrefix(attrs.Name, s.gcs.Prefix)
// trim the prefix '/' to ensure that the path returned is consistent with the local storage
path = strings.TrimPrefix(path, "/")
if err = fn(path, attrs.Size); err != nil {
return errors.Trace(err)
}
Expand Down
81 changes: 70 additions & 11 deletions br/pkg/storage/gcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,19 +86,78 @@ func TestGCS(t *testing.T) {
require.NoError(t, err)
require.False(t, exist)

list := ""
var totalSize int64 = 0
err = stg.WalkDir(ctx, nil, func(name string, size int64) error {
list += name
totalSize += size
return nil
})
require.NoError(t, err)
require.Equal(t, "keykey1key2", list)
require.Equal(t, int64(42), totalSize)
checkWalkDir := func(stg *gcsStorage, opt *WalkOption) {
var totalSize int64 = 0
err = stg.WalkDir(ctx, opt, func(name string, size int64) error {
totalSize += size
// also test can use this path open file
_, err := stg.Open(ctx, name)
require.NoError(t, err)
return nil
})
require.NoError(t, err)
require.Equal(t, int64(42), totalSize)
}
// test right prefix without sub dir opt
{
checkWalkDir(stg, nil)
}

// test right prefix with sub dir opt
{
gcs := &backuppb.GCS{
Bucket: bucketName,
Prefix: "a/", // right prefix is /a/b/
StorageClass: "NEARLINE",
PredefinedAcl: "private",
CredentialsBlob: "Fake Credentials",
}
stg, err := newGCSStorage(ctx, gcs, &ExternalStorageOptions{
SendCredentials: false,
CheckPermissions: []Permission{AccessBuckets},
HTTPClient: server.HTTPClient(),
})
require.NoError(t, err)
checkWalkDir(stg, &WalkOption{SubDir: "b/"})
}

// test prefix without slash in new bucket without sub dir opt
{
gcs := &backuppb.GCS{
Bucket: bucketName,
Prefix: "a/b", // right prefix is "a/b/"
StorageClass: "NEARLINE",
PredefinedAcl: "private",
CredentialsBlob: "Fake Credentials",
}
stg, err := newGCSStorage(ctx, gcs, &ExternalStorageOptions{
SendCredentials: false,
CheckPermissions: []Permission{AccessBuckets},
HTTPClient: server.HTTPClient(),
})
require.NoError(t, err)
checkWalkDir(stg, nil)
}
// test prefix without slash in new bucket with sub dir opt
{
gcs := &backuppb.GCS{
Bucket: bucketName,
Prefix: "a", // right prefix is "a/b/"
StorageClass: "NEARLINE",
PredefinedAcl: "private",
CredentialsBlob: "Fake Credentials",
}
stg, err := newGCSStorage(ctx, gcs, &ExternalStorageOptions{
SendCredentials: false,
CheckPermissions: []Permission{AccessBuckets},
HTTPClient: server.HTTPClient(),
})
require.NoError(t, err)
checkWalkDir(stg, &WalkOption{SubDir: "b/"})
}

// test 1003 files
totalSize = 0
var totalSize int64 = 0
for i := 0; i < 1000; i += 1 {
err = stg.WriteFile(ctx, fmt.Sprintf("f%d", i), []byte("data"))
require.NoError(t, err)
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,8 @@ func (rs *S3Storage) WalkDir(ctx context.Context, opt *WalkOption, fn func(strin
// which can not be reuse in other API(Open/Read) directly.
// so we use TrimPrefix to filter Prefix for next Open/Read.
path := strings.TrimPrefix(*r.Key, rs.options.Prefix)
// trim the prefix '/' to ensure that the path returned is consistent with the local storage
path = strings.TrimPrefix(path, "/")
itemSize := *r.Size

// filter out s3's empty directory items
Expand Down

0 comments on commit 9392793

Please sign in to comment.