Skip to content

Commit

Permalink
importinto: check access of cloud storage uri (#47206)
Browse files Browse the repository at this point in the history
ref #46704
  • Loading branch information
D3Hunter authored Sep 22, 2023
1 parent e8247b5 commit 9d29580
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 24 deletions.
2 changes: 1 addition & 1 deletion br/pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ go_test(
],
embed = [":storage"],
flaky = True,
shard_count = 49,
shard_count = 50,
deps = [
"//br/pkg/mock",
"@com_github_aws_aws_sdk_go//aws",
Expand Down
83 changes: 61 additions & 22 deletions br/pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
Expand Down Expand Up @@ -64,10 +65,11 @@ const (
domainAliyun = "aliyuncs.com"
)

var permissionCheckFn = map[Permission]func(*s3.S3, *backuppb.S3) error{
AccessBuckets: checkS3Bucket,
ListObjects: listObjects,
GetObject: getObject,
var permissionCheckFn = map[Permission]func(context.Context, s3iface.S3API, *backuppb.S3) error{
AccessBuckets: s3BucketExistenceCheck,
ListObjects: listObjectsCheck,
GetObject: getObjectCheck,
PutAndDeleteObject: PutAndDeleteObjectCheck,
}

// WriteBufferSize is the size of the buffer used for writing. (64K may be a better choice)
Expand Down Expand Up @@ -410,7 +412,7 @@ func NewS3Storage(ctx context.Context, backend *backuppb.S3, opts *ExternalStora
}

for _, p := range opts.CheckPermissions {
err := permissionCheckFn[p](c, &qs)
err := permissionCheckFn[p](ctx, c, &qs)
if err != nil {
return nil, errors.Annotatef(berrors.ErrStorageInvalidPermission, "check permission %s failed due to %v", p, err)
}
Expand All @@ -428,16 +430,16 @@ func NewS3Storage(ctx context.Context, backend *backuppb.S3, opts *ExternalStora
}

// checkBucket checks if a bucket exists.
func checkS3Bucket(svc *s3.S3, qs *backuppb.S3) error {
func s3BucketExistenceCheck(_ context.Context, svc s3iface.S3API, qs *backuppb.S3) error {
input := &s3.HeadBucketInput{
Bucket: aws.String(qs.Bucket),
}
_, err := svc.HeadBucket(input)
return errors.Trace(err)
}

// listObjects checks the permission of listObjects
func listObjects(svc *s3.S3, qs *backuppb.S3) error {
// listObjectsCheck checks the permission of listObjects
func listObjectsCheck(_ context.Context, svc s3iface.S3API, qs *backuppb.S3) error {
input := &s3.ListObjectsInput{
Bucket: aws.String(qs.Bucket),
Prefix: aws.String(qs.Prefix),
Expand All @@ -450,8 +452,8 @@ func listObjects(svc *s3.S3, qs *backuppb.S3) error {
return nil
}

// getObject checks the permission of getObject
func getObject(svc *s3.S3, qs *backuppb.S3) error {
// getObjectCheck checks the permission of getObject
func getObjectCheck(_ context.Context, svc s3iface.S3API, qs *backuppb.S3) error {
input := &s3.GetObjectInput{
Bucket: aws.String(qs.Bucket),
Key: aws.String("not-exists"),
Expand All @@ -469,6 +471,38 @@ func getObject(svc *s3.S3, qs *backuppb.S3) error {
return nil
}

// PutAndDeleteObjectCheck checks the permission of putObject
// S3 API doesn't provide a way to check the permission, we have to put an
// object to check the permission.
// exported for testing.
func PutAndDeleteObjectCheck(ctx context.Context, svc s3iface.S3API, options *backuppb.S3) (err error) {
file := fmt.Sprintf("access-check/%s", uuid.New().String())
defer func() {
// we always delete the object used for permission check,
// even on error, since the object might be created successfully even
// when it returns an error.
input := &s3.DeleteObjectInput{
Bucket: aws.String(options.Bucket),
Key: aws.String(options.Prefix + file),
}
_, err2 := svc.DeleteObjectWithContext(ctx, input)
if aerr, ok := err2.(awserr.Error); ok {
if aerr.Code() != "NoSuchKey" {
log.Warn("failed to delete object used for permission check",
zap.String("bucket", options.Bucket),
zap.String("key", *input.Key), zap.Error(err2))
}
}
if err == nil {
err = errors.Trace(err2)
}
}()
// when no permission, aws returns err with code "AccessDenied"
input := buildPutObjectInput(options, file, []byte("check"))
_, err = svc.PutObjectWithContext(ctx, input)
return errors.Trace(err)
}

func (rs *S3Storage) IsObjectLockEnabled() bool {
input := &s3.GetObjectLockConfigurationInput{
Bucket: aws.String(rs.options.Bucket),
Expand All @@ -486,25 +520,30 @@ func (rs *S3Storage) IsObjectLockEnabled() bool {
return false
}

// WriteFile writes data to a file to storage.
func (rs *S3Storage) WriteFile(ctx context.Context, file string, data []byte) error {
func buildPutObjectInput(options *backuppb.S3, file string, data []byte) *s3.PutObjectInput {
input := &s3.PutObjectInput{
Body: aws.ReadSeekCloser(bytes.NewReader(data)),
Bucket: aws.String(rs.options.Bucket),
Key: aws.String(rs.options.Prefix + file),
Bucket: aws.String(options.Bucket),
Key: aws.String(options.Prefix + file),
}
if rs.options.Acl != "" {
input = input.SetACL(rs.options.Acl)
if options.Acl != "" {
input = input.SetACL(options.Acl)
}
if rs.options.Sse != "" {
input = input.SetServerSideEncryption(rs.options.Sse)
if options.Sse != "" {
input = input.SetServerSideEncryption(options.Sse)
}
if rs.options.SseKmsKeyId != "" {
input = input.SetSSEKMSKeyId(rs.options.SseKmsKeyId)
if options.SseKmsKeyId != "" {
input = input.SetSSEKMSKeyId(options.SseKmsKeyId)
}
if rs.options.StorageClass != "" {
input = input.SetStorageClass(rs.options.StorageClass)
if options.StorageClass != "" {
input = input.SetStorageClass(options.StorageClass)
}
return input
}

// WriteFile writes data to a file to storage.
func (rs *S3Storage) WriteFile(ctx context.Context, file string, data []byte) error {
input := buildPutObjectInput(rs.options, file, data)
// we don't need to calculate contentMD5 if s3 object lock enabled.
// since aws-go-sdk already did it in #computeBodyHashes
// https://github.com/aws/aws-sdk-go/blob/bcb2cf3fc2263c8c28b3119b07d2dbb44d7c93a0/service/s3/body_hash.go#L30
Expand Down
25 changes: 25 additions & 0 deletions br/pkg/storage/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,31 @@ func TestOpenReadSlowly(t *testing.T) {
require.Equal(t, []byte("ABCDEFGHIJKLMNOPQRSTUVWXYZ"), res)
}

func TestPutAndDeleteObjectCheck(t *testing.T) {
s := createS3Suite(t)
ctx := aws.BackgroundContext()

s.s3.EXPECT().PutObjectWithContext(gomock.Any(), gomock.Any()).Return(nil, nil)
s.s3.EXPECT().DeleteObjectWithContext(gomock.Any(), gomock.Any()).Return(nil, nil)
require.NoError(t, PutAndDeleteObjectCheck(ctx, s.s3, &backuppb.S3{}))

s.s3.EXPECT().PutObjectWithContext(gomock.Any(), gomock.Any()).Return(nil, errors.New("mock put error"))
s.s3.EXPECT().DeleteObjectWithContext(gomock.Any(), gomock.Any()).Return(nil, nil)
require.ErrorContains(t, PutAndDeleteObjectCheck(ctx, s.s3, &backuppb.S3{}), "mock put error")

s.s3.EXPECT().PutObjectWithContext(gomock.Any(), gomock.Any()).Return(nil, nil)
s.s3.EXPECT().DeleteObjectWithContext(gomock.Any(), gomock.Any()).Return(nil, errors.New("mock del error"))
require.ErrorContains(t, PutAndDeleteObjectCheck(ctx, s.s3, &backuppb.S3{}), "mock del error")

s.s3.EXPECT().PutObjectWithContext(gomock.Any(), gomock.Any()).Return(nil, nil)
s.s3.EXPECT().DeleteObjectWithContext(gomock.Any(), gomock.Any()).Return(nil, awserr.New("AccessDenied", "", nil))
require.ErrorContains(t, PutAndDeleteObjectCheck(ctx, s.s3, &backuppb.S3{}), "AccessDenied")

s.s3.EXPECT().PutObjectWithContext(gomock.Any(), gomock.Any()).Return(nil, errors.New("mock put error"))
s.s3.EXPECT().DeleteObjectWithContext(gomock.Any(), gomock.Any()).Return(nil, errors.New("mock del error"))
require.ErrorContains(t, PutAndDeleteObjectCheck(ctx, s.s3, &backuppb.S3{}), "mock put error")
}

// TestOpenSeek checks that Seek is implemented correctly.
func TestOpenSeek(t *testing.T) {
s := createS3Suite(t)
Expand Down
3 changes: 3 additions & 0 deletions br/pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ const (
GetObject Permission = "GetObject"
// PutObject represents PutObject permission
PutObject Permission = "PutObject"
// PutAndDeleteObject represents PutAndDeleteObject permission
// we cannot check DeleteObject permission alone, so we use PutAndDeleteObject instead.
PutAndDeleteObject Permission = "PutAndDeleteObject"

DefaultRequestConcurrency uint = 128
)
Expand Down
2 changes: 2 additions & 0 deletions executor/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ go_test(
"//util/logutil",
"//util/mock",
"//util/sqlexec",
"@com_github_johannesboyne_gofakes3//:gofakes3",
"@com_github_johannesboyne_gofakes3//backend/s3mem",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_log//:log",
Expand Down
45 changes: 44 additions & 1 deletion executor/importer/precheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/streamhelper"
"github.com/pingcap/tidb/br/pkg/utils"
tidb "github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/dbterror/exeerrors"
"github.com/pingcap/tidb/util/etcd"
"github.com/pingcap/tidb/util/intest"
"github.com/pingcap/tidb/util/sqlexec"
)

Expand All @@ -53,7 +55,13 @@ func (e *LoadDataController) CheckRequirements(ctx context.Context, conn sqlexec
if err := e.checkTableEmpty(ctx, conn); err != nil {
return err
}
return e.checkCDCPiTRTasks(ctx)
if err := e.checkCDCPiTRTasks(ctx); err != nil {
return err
}
if e.IsGlobalSort() {
return e.checkGlobalSortStorePrivilege(ctx)
}
return nil
}

func (e *LoadDataController) checkTotalFileSize() error {
Expand Down Expand Up @@ -114,6 +122,41 @@ func (*LoadDataController) checkCDCPiTRTasks(ctx context.Context) error {
return nil
}

func (e *LoadDataController) checkGlobalSortStorePrivilege(ctx context.Context) error {
// we need read/put/delete/list privileges on global sort store.
// only support S3 now.
target := "cloud storage"
cloudStorageURL, err3 := storage.ParseRawURL(e.Plan.CloudStorageURI)
if err3 != nil {
return exeerrors.ErrLoadDataInvalidURI.GenWithStackByArgs(target, err3.Error())
}
b, err2 := storage.ParseBackendFromURL(cloudStorageURL, nil)
if err2 != nil {
return exeerrors.ErrLoadDataInvalidURI.GenWithStackByArgs(target, GetMsgFromBRError(err2))
}

if b.GetS3() == nil && b.GetGcs() == nil {
// we only support S3 now, but in test we are using GCS.
return exeerrors.ErrLoadDataPreCheckFailed.FastGenByArgs("unsupported cloud storage uri scheme: " + cloudStorageURL.Scheme)
}

opt := &storage.ExternalStorageOptions{
CheckPermissions: []storage.Permission{
storage.GetObject,
storage.ListObjects,
storage.PutAndDeleteObject,
},
}
if intest.InTest {
opt.NoCredentials = true
}
_, err := storage.New(ctx, b, opt)
if err != nil {
return exeerrors.ErrLoadDataPreCheckFailed.FastGenByArgs("check cloud storage uri access: " + err.Error())
}
return nil
}

func getEtcdClient() (*etcd.Client, error) {
tidbCfg := tidb.GetGlobalConfig()
tls, err := util.NewTLSConfig(
Expand Down
19 changes: 19 additions & 0 deletions executor/importer/precheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ import (
"context"
"fmt"
"math/rand"
"net/http/httptest"
"net/url"
"testing"
"time"

"github.com/johannesboyne/gofakes3"
"github.com/johannesboyne/gofakes3/backend/s3mem"
"github.com/pingcap/tidb/br/pkg/streamhelper"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/executor/importer"
Expand Down Expand Up @@ -145,4 +148,20 @@ func TestCheckRequirements(t *testing.T) {
_, err = etcdCli.Delete(ctx, cdcKey)
require.NoError(t, err)
require.NoError(t, c.CheckRequirements(ctx, conn))

// with global sort
c.Plan.CloudStorageURI = ":"
require.ErrorIs(t, c.CheckRequirements(ctx, conn), exeerrors.ErrLoadDataInvalidURI)
c.Plan.CloudStorageURI = "sdsdsdsd://sdsdsdsd"
require.ErrorIs(t, c.CheckRequirements(ctx, conn), exeerrors.ErrLoadDataInvalidURI)
c.Plan.CloudStorageURI = "local:///tmp"
require.ErrorContains(t, c.CheckRequirements(ctx, conn), "unsupported cloud storage uri scheme: local")
// this mock cannot mock credential check, so we just skip it.
backend := s3mem.New()
faker := gofakes3.New(backend)
ts := httptest.NewServer(faker.Server())
defer ts.Close()
require.NoError(t, backend.CreateBucket("test-bucket"))
c.Plan.CloudStorageURI = fmt.Sprintf("s3://test-bucket/path?region=us-east-1&endpoint=%s&access-key=xxxxxx&secret-access-key=xxxxxx", ts.URL)
require.NoError(t, c.CheckRequirements(ctx, conn))
}

0 comments on commit 9d29580

Please sign in to comment.