Skip to content

Commit

Permalink
Extracted SSE injection from UserBucketClient into SSEBucketClient (c…
Browse files Browse the repository at this point in the history
…ortexproject#3911)

* Extracted SSE injection from UserBucketClient into SSEBucketClient

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Fixed tools

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Simplified SSEBucketClient

Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci authored and harry671003 committed Mar 11, 2021
1 parent 94d4f22 commit 471b912
Show file tree
Hide file tree
Showing 7 changed files with 276 additions and 260 deletions.
4 changes: 2 additions & 2 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b

// cleanUserPartialBlocks delete partial blocks which are safe to be deleted. The provided partials map
// is updated accordingly.
func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, partials map[ulid.ULID]error, idx *bucketindex.Index, userBucket *bucket.UserBucketClient, userLogger log.Logger) {
func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, partials map[ulid.ULID]error, idx *bucketindex.Index, userBucket objstore.InstrumentedBucket, userLogger log.Logger) {
for blockID, blockErr := range partials {
// We can safely delete only blocks which are partial because the meta.json is missing.
if !errors.Is(blockErr, bucketindex.ErrBlockMetaNotFound) {
Expand Down Expand Up @@ -411,7 +411,7 @@ func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, partials map
}

// applyUserRetentionPeriod marks blocks for deletion which have aged past the retention period.
func (c *BlocksCleaner) applyUserRetentionPeriod(ctx context.Context, idx *bucketindex.Index, retention time.Duration, userBucket *bucket.UserBucketClient, userLogger log.Logger) {
func (c *BlocksCleaner) applyUserRetentionPeriod(ctx context.Context, idx *bucketindex.Index, retention time.Duration, userBucket objstore.Bucket, userLogger log.Logger) {
// The retention period of zero is a special value indicating to never delete.
if retention <= 0 {
return
Expand Down
143 changes: 143 additions & 0 deletions pkg/storage/bucket/sse_bucket_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package bucket

import (
"context"
"io"

"github.com/minio/minio-go/v7/pkg/encrypt"
"github.com/pkg/errors"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/s3"

cortex_s3 "github.com/cortexproject/cortex/pkg/storage/bucket/s3"
)

// TenantConfigProvider defines a per-tenant config provider.
type TenantConfigProvider interface {
// S3SSEType returns the per-tenant S3 SSE type.
S3SSEType(userID string) string

// S3SSEKMSKeyID returns the per-tenant S3 KMS-SSE key id or an empty string if not set.
S3SSEKMSKeyID(userID string) string

// S3SSEKMSEncryptionContext returns the per-tenant S3 KMS-SSE key id or an empty string if not set.
S3SSEKMSEncryptionContext(userID string) string
}

// SSEBucketClient is a wrapper around a objstore.BucketReader that configures the object
// storage server-side encryption (SSE) for a given user.
type SSEBucketClient struct {
userID string
bucket objstore.Bucket
cfgProvider TenantConfigProvider
}

// NewSSEBucketClient makes a new SSEBucketClient. The cfgProvider can be nil.
func NewSSEBucketClient(userID string, bucket objstore.Bucket, cfgProvider TenantConfigProvider) *SSEBucketClient {
return &SSEBucketClient{
userID: userID,
bucket: bucket,
cfgProvider: cfgProvider,
}
}

// Close implements objstore.Bucket.
func (b *SSEBucketClient) Close() error {
return b.bucket.Close()
}

// Upload the contents of the reader as an object into the bucket.
func (b *SSEBucketClient) Upload(ctx context.Context, name string, r io.Reader) error {
if sse, err := b.getCustomS3SSEConfig(); err != nil {
return err
} else if sse != nil {
// If the underlying bucket client is not S3 and a custom S3 SSE config has been
// provided, the config option will be ignored.
ctx = s3.ContextWithSSEConfig(ctx, sse)
}

return b.bucket.Upload(ctx, name, r)
}

// Delete implements objstore.Bucket.
func (b *SSEBucketClient) Delete(ctx context.Context, name string) error {
return b.bucket.Delete(ctx, name)
}

// Name implements objstore.Bucket.
func (b *SSEBucketClient) Name() string {
return b.bucket.Name()
}

func (b *SSEBucketClient) getCustomS3SSEConfig() (encrypt.ServerSide, error) {
if b.cfgProvider == nil {
return nil, nil
}

// No S3 SSE override if the type override hasn't been provided.
sseType := b.cfgProvider.S3SSEType(b.userID)
if sseType == "" {
return nil, nil
}

cfg := cortex_s3.SSEConfig{
Type: sseType,
KMSKeyID: b.cfgProvider.S3SSEKMSKeyID(b.userID),
KMSEncryptionContext: b.cfgProvider.S3SSEKMSEncryptionContext(b.userID),
}

sse, err := cfg.BuildMinioConfig()
if err != nil {
return nil, errors.Wrapf(err, "unable to customise S3 SSE config for tenant %s", b.userID)
}

return sse, nil
}

// Iter implements objstore.Bucket.
func (b *SSEBucketClient) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error {
return b.bucket.Iter(ctx, dir, f, options...)
}

// Get implements objstore.Bucket.
func (b *SSEBucketClient) Get(ctx context.Context, name string) (io.ReadCloser, error) {
return b.bucket.Get(ctx, name)
}

// GetRange implements objstore.Bucket.
func (b *SSEBucketClient) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
return b.bucket.GetRange(ctx, name, off, length)
}

// Exists implements objstore.Bucket.
func (b *SSEBucketClient) Exists(ctx context.Context, name string) (bool, error) {
return b.bucket.Exists(ctx, name)
}

// IsObjNotFoundErr implements objstore.Bucket.
func (b *SSEBucketClient) IsObjNotFoundErr(err error) bool {
return b.bucket.IsObjNotFoundErr(err)
}

// Attributes implements objstore.Bucket.
func (b *SSEBucketClient) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) {
return b.bucket.Attributes(ctx, name)
}

// ReaderWithExpectedErrs implements objstore.Bucket.
func (b *SSEBucketClient) ReaderWithExpectedErrs(fn objstore.IsOpFailureExpectedFunc) objstore.BucketReader {
return b.WithExpectedErrs(fn)
}

// WithExpectedErrs implements objstore.Bucket.
func (b *SSEBucketClient) WithExpectedErrs(fn objstore.IsOpFailureExpectedFunc) objstore.Bucket {
if ib, ok := b.bucket.(objstore.InstrumentedBucket); ok {
return &SSEBucketClient{
userID: b.userID,
bucket: ib.WithExpectedErrs(fn),
cfgProvider: b.cfgProvider,
}
}

return b
}
124 changes: 124 additions & 0 deletions pkg/storage/bucket/sse_bucket_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package bucket

import (
"context"
"encoding/base64"
"net/http"
"net/http/httptest"
"strings"
"testing"

"github.com/go-kit/kit/log"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/objstore"

"github.com/cortexproject/cortex/pkg/storage/bucket/s3"
"github.com/cortexproject/cortex/pkg/util/flagext"
)

func TestSSEBucketClient_Upload_ShouldInjectCustomSSEConfig(t *testing.T) {
tests := map[string]struct {
withExpectedErrs bool
}{
"default client": {
withExpectedErrs: false,
},
"client with expected errors": {
withExpectedErrs: true,
},
}

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
const (
kmsKeyID = "ABC"
kmsEncryptionContext = "{\"department\":\"10103.0\"}"
)

var req *http.Request

// Start a fake HTTP server which simulate S3.
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Keep track of the received request.
req = r

w.WriteHeader(http.StatusOK)
}))
defer srv.Close()

s3Cfg := s3.Config{
Endpoint: srv.Listener.Addr().String(),
Region: "test",
BucketName: "test-bucket",
SecretAccessKey: flagext.Secret{Value: "test"},
AccessKeyID: "test",
Insecure: true,
}

s3Client, err := s3.NewBucketClient(s3Cfg, "test", log.NewNopLogger())
require.NoError(t, err)

// Configure the config provider with NO KMS key ID.
cfgProvider := &mockTenantConfigProvider{}

var sseBkt objstore.Bucket
if testData.withExpectedErrs {
sseBkt = NewSSEBucketClient("user-1", s3Client, cfgProvider).WithExpectedErrs(s3Client.IsObjNotFoundErr)
} else {
sseBkt = NewSSEBucketClient("user-1", s3Client, cfgProvider)
}

err = sseBkt.Upload(context.Background(), "test", strings.NewReader("test"))
require.NoError(t, err)

// Ensure NO KMS header has been injected.
assert.Equal(t, "", req.Header.Get("x-amz-server-side-encryption"))
assert.Equal(t, "", req.Header.Get("x-amz-server-side-encryption-aws-kms-key-id"))
assert.Equal(t, "", req.Header.Get("x-amz-server-side-encryption-context"))

// Configure the config provider with a KMS key ID and without encryption context.
cfgProvider.s3SseType = s3.SSEKMS
cfgProvider.s3KmsKeyID = kmsKeyID

err = sseBkt.Upload(context.Background(), "test", strings.NewReader("test"))
require.NoError(t, err)

// Ensure the KMS header has been injected.
assert.Equal(t, "aws:kms", req.Header.Get("x-amz-server-side-encryption"))
assert.Equal(t, kmsKeyID, req.Header.Get("x-amz-server-side-encryption-aws-kms-key-id"))
assert.Equal(t, "", req.Header.Get("x-amz-server-side-encryption-context"))

// Configure the config provider with a KMS key ID and encryption context.
cfgProvider.s3SseType = s3.SSEKMS
cfgProvider.s3KmsKeyID = kmsKeyID
cfgProvider.s3KmsEncryptionContext = kmsEncryptionContext

err = sseBkt.Upload(context.Background(), "test", strings.NewReader("test"))
require.NoError(t, err)

// Ensure the KMS header has been injected.
assert.Equal(t, "aws:kms", req.Header.Get("x-amz-server-side-encryption"))
assert.Equal(t, kmsKeyID, req.Header.Get("x-amz-server-side-encryption-aws-kms-key-id"))
assert.Equal(t, base64.StdEncoding.EncodeToString([]byte(kmsEncryptionContext)), req.Header.Get("x-amz-server-side-encryption-context"))
})
}
}

type mockTenantConfigProvider struct {
s3SseType string
s3KmsKeyID string
s3KmsEncryptionContext string
}

func (m *mockTenantConfigProvider) S3SSEType(_ string) string {
return m.s3SseType
}

func (m *mockTenantConfigProvider) S3SSEKMSKeyID(_ string) string {
return m.s3KmsKeyID
}

func (m *mockTenantConfigProvider) S3SSEKMSEncryptionContext(_ string) string {
return m.s3KmsEncryptionContext
}
Loading

0 comments on commit 471b912

Please sign in to comment.