From 471b91234ed08c782c4122792cfc4d22a2487c7e Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Fri, 5 Mar 2021 10:50:27 +0100 Subject: [PATCH] Extracted SSE injection from UserBucketClient into SSEBucketClient (#3911) * Extracted SSE injection from UserBucketClient into SSEBucketClient Signed-off-by: Marco Pracucci * Fixed tools Signed-off-by: Marco Pracucci * Simplified SSEBucketClient Signed-off-by: Marco Pracucci --- pkg/compactor/blocks_cleaner.go | 4 +- pkg/storage/bucket/sse_bucket_client.go | 143 ++++++++++++++++ pkg/storage/bucket/sse_bucket_client_test.go | 124 ++++++++++++++ pkg/storage/bucket/user_bucket_client.go | 159 +----------------- pkg/storage/bucket/user_bucket_client_test.go | 102 ----------- tools/blocksconvert/builder/builder.go | 2 +- tools/thanosconvert/thanosconvert.go | 2 +- 7 files changed, 276 insertions(+), 260 deletions(-) create mode 100644 pkg/storage/bucket/sse_bucket_client.go create mode 100644 pkg/storage/bucket/sse_bucket_client_test.go delete mode 100644 pkg/storage/bucket/user_bucket_client_test.go diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index f6599e8190..0ebd79aa83 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -376,7 +376,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b // cleanUserPartialBlocks delete partial blocks which are safe to be deleted. The provided partials map // is updated accordingly. -func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, partials map[ulid.ULID]error, idx *bucketindex.Index, userBucket *bucket.UserBucketClient, userLogger log.Logger) { +func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, partials map[ulid.ULID]error, idx *bucketindex.Index, userBucket objstore.InstrumentedBucket, userLogger log.Logger) { for blockID, blockErr := range partials { // We can safely delete only blocks which are partial because the meta.json is missing. if !errors.Is(blockErr, bucketindex.ErrBlockMetaNotFound) { @@ -411,7 +411,7 @@ func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, partials map } // applyUserRetentionPeriod marks blocks for deletion which have aged past the retention period. -func (c *BlocksCleaner) applyUserRetentionPeriod(ctx context.Context, idx *bucketindex.Index, retention time.Duration, userBucket *bucket.UserBucketClient, userLogger log.Logger) { +func (c *BlocksCleaner) applyUserRetentionPeriod(ctx context.Context, idx *bucketindex.Index, retention time.Duration, userBucket objstore.Bucket, userLogger log.Logger) { // The retention period of zero is a special value indicating to never delete. if retention <= 0 { return diff --git a/pkg/storage/bucket/sse_bucket_client.go b/pkg/storage/bucket/sse_bucket_client.go new file mode 100644 index 0000000000..88b1a8a430 --- /dev/null +++ b/pkg/storage/bucket/sse_bucket_client.go @@ -0,0 +1,143 @@ +package bucket + +import ( + "context" + "io" + + "github.com/minio/minio-go/v7/pkg/encrypt" + "github.com/pkg/errors" + "github.com/thanos-io/thanos/pkg/objstore" + "github.com/thanos-io/thanos/pkg/objstore/s3" + + cortex_s3 "github.com/cortexproject/cortex/pkg/storage/bucket/s3" +) + +// TenantConfigProvider defines a per-tenant config provider. +type TenantConfigProvider interface { + // S3SSEType returns the per-tenant S3 SSE type. + S3SSEType(userID string) string + + // S3SSEKMSKeyID returns the per-tenant S3 KMS-SSE key id or an empty string if not set. + S3SSEKMSKeyID(userID string) string + + // S3SSEKMSEncryptionContext returns the per-tenant S3 KMS-SSE key id or an empty string if not set. + S3SSEKMSEncryptionContext(userID string) string +} + +// SSEBucketClient is a wrapper around a objstore.BucketReader that configures the object +// storage server-side encryption (SSE) for a given user. +type SSEBucketClient struct { + userID string + bucket objstore.Bucket + cfgProvider TenantConfigProvider +} + +// NewSSEBucketClient makes a new SSEBucketClient. The cfgProvider can be nil. +func NewSSEBucketClient(userID string, bucket objstore.Bucket, cfgProvider TenantConfigProvider) *SSEBucketClient { + return &SSEBucketClient{ + userID: userID, + bucket: bucket, + cfgProvider: cfgProvider, + } +} + +// Close implements objstore.Bucket. +func (b *SSEBucketClient) Close() error { + return b.bucket.Close() +} + +// Upload the contents of the reader as an object into the bucket. +func (b *SSEBucketClient) Upload(ctx context.Context, name string, r io.Reader) error { + if sse, err := b.getCustomS3SSEConfig(); err != nil { + return err + } else if sse != nil { + // If the underlying bucket client is not S3 and a custom S3 SSE config has been + // provided, the config option will be ignored. + ctx = s3.ContextWithSSEConfig(ctx, sse) + } + + return b.bucket.Upload(ctx, name, r) +} + +// Delete implements objstore.Bucket. +func (b *SSEBucketClient) Delete(ctx context.Context, name string) error { + return b.bucket.Delete(ctx, name) +} + +// Name implements objstore.Bucket. +func (b *SSEBucketClient) Name() string { + return b.bucket.Name() +} + +func (b *SSEBucketClient) getCustomS3SSEConfig() (encrypt.ServerSide, error) { + if b.cfgProvider == nil { + return nil, nil + } + + // No S3 SSE override if the type override hasn't been provided. + sseType := b.cfgProvider.S3SSEType(b.userID) + if sseType == "" { + return nil, nil + } + + cfg := cortex_s3.SSEConfig{ + Type: sseType, + KMSKeyID: b.cfgProvider.S3SSEKMSKeyID(b.userID), + KMSEncryptionContext: b.cfgProvider.S3SSEKMSEncryptionContext(b.userID), + } + + sse, err := cfg.BuildMinioConfig() + if err != nil { + return nil, errors.Wrapf(err, "unable to customise S3 SSE config for tenant %s", b.userID) + } + + return sse, nil +} + +// Iter implements objstore.Bucket. +func (b *SSEBucketClient) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { + return b.bucket.Iter(ctx, dir, f, options...) +} + +// Get implements objstore.Bucket. +func (b *SSEBucketClient) Get(ctx context.Context, name string) (io.ReadCloser, error) { + return b.bucket.Get(ctx, name) +} + +// GetRange implements objstore.Bucket. +func (b *SSEBucketClient) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { + return b.bucket.GetRange(ctx, name, off, length) +} + +// Exists implements objstore.Bucket. +func (b *SSEBucketClient) Exists(ctx context.Context, name string) (bool, error) { + return b.bucket.Exists(ctx, name) +} + +// IsObjNotFoundErr implements objstore.Bucket. +func (b *SSEBucketClient) IsObjNotFoundErr(err error) bool { + return b.bucket.IsObjNotFoundErr(err) +} + +// Attributes implements objstore.Bucket. +func (b *SSEBucketClient) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) { + return b.bucket.Attributes(ctx, name) +} + +// ReaderWithExpectedErrs implements objstore.Bucket. +func (b *SSEBucketClient) ReaderWithExpectedErrs(fn objstore.IsOpFailureExpectedFunc) objstore.BucketReader { + return b.WithExpectedErrs(fn) +} + +// WithExpectedErrs implements objstore.Bucket. +func (b *SSEBucketClient) WithExpectedErrs(fn objstore.IsOpFailureExpectedFunc) objstore.Bucket { + if ib, ok := b.bucket.(objstore.InstrumentedBucket); ok { + return &SSEBucketClient{ + userID: b.userID, + bucket: ib.WithExpectedErrs(fn), + cfgProvider: b.cfgProvider, + } + } + + return b +} diff --git a/pkg/storage/bucket/sse_bucket_client_test.go b/pkg/storage/bucket/sse_bucket_client_test.go new file mode 100644 index 0000000000..40570e794b --- /dev/null +++ b/pkg/storage/bucket/sse_bucket_client_test.go @@ -0,0 +1,124 @@ +package bucket + +import ( + "context" + "encoding/base64" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/go-kit/kit/log" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/objstore" + + "github.com/cortexproject/cortex/pkg/storage/bucket/s3" + "github.com/cortexproject/cortex/pkg/util/flagext" +) + +func TestSSEBucketClient_Upload_ShouldInjectCustomSSEConfig(t *testing.T) { + tests := map[string]struct { + withExpectedErrs bool + }{ + "default client": { + withExpectedErrs: false, + }, + "client with expected errors": { + withExpectedErrs: true, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + const ( + kmsKeyID = "ABC" + kmsEncryptionContext = "{\"department\":\"10103.0\"}" + ) + + var req *http.Request + + // Start a fake HTTP server which simulate S3. + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Keep track of the received request. + req = r + + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + s3Cfg := s3.Config{ + Endpoint: srv.Listener.Addr().String(), + Region: "test", + BucketName: "test-bucket", + SecretAccessKey: flagext.Secret{Value: "test"}, + AccessKeyID: "test", + Insecure: true, + } + + s3Client, err := s3.NewBucketClient(s3Cfg, "test", log.NewNopLogger()) + require.NoError(t, err) + + // Configure the config provider with NO KMS key ID. + cfgProvider := &mockTenantConfigProvider{} + + var sseBkt objstore.Bucket + if testData.withExpectedErrs { + sseBkt = NewSSEBucketClient("user-1", s3Client, cfgProvider).WithExpectedErrs(s3Client.IsObjNotFoundErr) + } else { + sseBkt = NewSSEBucketClient("user-1", s3Client, cfgProvider) + } + + err = sseBkt.Upload(context.Background(), "test", strings.NewReader("test")) + require.NoError(t, err) + + // Ensure NO KMS header has been injected. + assert.Equal(t, "", req.Header.Get("x-amz-server-side-encryption")) + assert.Equal(t, "", req.Header.Get("x-amz-server-side-encryption-aws-kms-key-id")) + assert.Equal(t, "", req.Header.Get("x-amz-server-side-encryption-context")) + + // Configure the config provider with a KMS key ID and without encryption context. + cfgProvider.s3SseType = s3.SSEKMS + cfgProvider.s3KmsKeyID = kmsKeyID + + err = sseBkt.Upload(context.Background(), "test", strings.NewReader("test")) + require.NoError(t, err) + + // Ensure the KMS header has been injected. + assert.Equal(t, "aws:kms", req.Header.Get("x-amz-server-side-encryption")) + assert.Equal(t, kmsKeyID, req.Header.Get("x-amz-server-side-encryption-aws-kms-key-id")) + assert.Equal(t, "", req.Header.Get("x-amz-server-side-encryption-context")) + + // Configure the config provider with a KMS key ID and encryption context. + cfgProvider.s3SseType = s3.SSEKMS + cfgProvider.s3KmsKeyID = kmsKeyID + cfgProvider.s3KmsEncryptionContext = kmsEncryptionContext + + err = sseBkt.Upload(context.Background(), "test", strings.NewReader("test")) + require.NoError(t, err) + + // Ensure the KMS header has been injected. + assert.Equal(t, "aws:kms", req.Header.Get("x-amz-server-side-encryption")) + assert.Equal(t, kmsKeyID, req.Header.Get("x-amz-server-side-encryption-aws-kms-key-id")) + assert.Equal(t, base64.StdEncoding.EncodeToString([]byte(kmsEncryptionContext)), req.Header.Get("x-amz-server-side-encryption-context")) + }) + } +} + +type mockTenantConfigProvider struct { + s3SseType string + s3KmsKeyID string + s3KmsEncryptionContext string +} + +func (m *mockTenantConfigProvider) S3SSEType(_ string) string { + return m.s3SseType +} + +func (m *mockTenantConfigProvider) S3SSEKMSKeyID(_ string) string { + return m.s3KmsKeyID +} + +func (m *mockTenantConfigProvider) S3SSEKMSEncryptionContext(_ string) string { + return m.s3KmsEncryptionContext +} diff --git a/pkg/storage/bucket/user_bucket_client.go b/pkg/storage/bucket/user_bucket_client.go index 4bfc68f664..773e717f9a 100644 --- a/pkg/storage/bucket/user_bucket_client.go +++ b/pkg/storage/bucket/user_bucket_client.go @@ -1,164 +1,15 @@ package bucket import ( - "context" - "io" - - "github.com/minio/minio-go/v7/pkg/encrypt" - "github.com/pkg/errors" "github.com/thanos-io/thanos/pkg/objstore" - "github.com/thanos-io/thanos/pkg/objstore/s3" - - cortex_s3 "github.com/cortexproject/cortex/pkg/storage/bucket/s3" ) -// TenantConfigProvider defines a per-tenant config provider. -type TenantConfigProvider interface { - // S3SSEType returns the per-tenant S3 SSE type. - S3SSEType(user string) string - - // S3SSEKMSKeyID returns the per-tenant S3 KMS-SSE key id or an empty string if not set. - S3SSEKMSKeyID(userID string) string - - // S3SSEKMSEncryptionContext returns the per-tenant S3 KMS-SSE key id or an empty string if not set. - S3SSEKMSEncryptionContext(userID string) string -} - -// UserBucketReaderClient is a wrapper around a objstore.BucketReader that reads from user-specific subfolder. -type UserBucketReaderClient struct { - userID string - prefixedBucket objstore.BucketReader -} - -// UserBucketClient is a wrapper around a objstore.Bucket that prepends writes with a userID -type UserBucketClient struct { - UserBucketReaderClient - prefixedBucket objstore.Bucket - cfgProvider TenantConfigProvider -} - -// NewUserBucketClient makes a new UserBucketClient. The cfgProvider can be nil. -func NewUserBucketClient(userID string, bucket objstore.Bucket, cfgProvider TenantConfigProvider) *UserBucketClient { +// NewUserBucketClient returns a bucket client to use to access the storage on behalf of the provided user. +// The cfgProvider can be nil. +func NewUserBucketClient(userID string, bucket objstore.Bucket, cfgProvider TenantConfigProvider) objstore.InstrumentedBucket { // Inject the user/tenant prefix. bucket = NewPrefixedBucketClient(bucket, userID) - return &UserBucketClient{ - UserBucketReaderClient: UserBucketReaderClient{ - userID: userID, - prefixedBucket: bucket, - }, - prefixedBucket: bucket, - cfgProvider: cfgProvider, - } -} - -// Close implements io.Closer -func (b *UserBucketClient) Close() error { return b.prefixedBucket.Close() } - -// Upload the contents of the reader as an object into the bucket. -func (b *UserBucketClient) Upload(ctx context.Context, name string, r io.Reader) error { - if sse, err := b.getCustomS3SSEConfig(); err != nil { - return err - } else if sse != nil { - // If the underlying bucket client is not S3 and a custom S3 SSE config has been - // provided, the config option will be ignored. - ctx = s3.ContextWithSSEConfig(ctx, sse) - } - - return b.prefixedBucket.Upload(ctx, name, r) -} - -// Delete removes the object with the given name. -func (b *UserBucketClient) Delete(ctx context.Context, name string) error { - return b.prefixedBucket.Delete(ctx, name) -} - -// Name returns the bucket name for the provider. -func (b *UserBucketClient) Name() string { return b.prefixedBucket.Name() } - -func (b *UserBucketClient) getCustomS3SSEConfig() (encrypt.ServerSide, error) { - if b.cfgProvider == nil { - return nil, nil - } - - // No S3 SSE override if the type override hasn't been provided. - sseType := b.cfgProvider.S3SSEType(b.userID) - if sseType == "" { - return nil, nil - } - - cfg := cortex_s3.SSEConfig{ - Type: sseType, - KMSKeyID: b.cfgProvider.S3SSEKMSKeyID(b.userID), - KMSEncryptionContext: b.cfgProvider.S3SSEKMSEncryptionContext(b.userID), - } - - sse, err := cfg.BuildMinioConfig() - if err != nil { - return nil, errors.Wrapf(err, "unable to customise S3 SSE config for tenant %s", b.userID) - } - - return sse, nil -} - -// Iter calls f for each entry in the given directory (not recursive.). The argument to f is the full -// object name including the prefix of the inspected directory. -func (b *UserBucketReaderClient) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { - return b.prefixedBucket.Iter(ctx, dir, f, options...) -} - -// Get returns a reader for the given object name. -func (b *UserBucketReaderClient) Get(ctx context.Context, name string) (io.ReadCloser, error) { - return b.prefixedBucket.Get(ctx, name) -} - -// GetRange returns a new range reader for the given object name and range. -func (b *UserBucketReaderClient) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { - return b.prefixedBucket.GetRange(ctx, name, off, length) -} - -// Exists checks if the given object exists in the bucket. -func (b *UserBucketReaderClient) Exists(ctx context.Context, name string) (bool, error) { - return b.prefixedBucket.Exists(ctx, name) -} - -// IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations. -func (b *UserBucketReaderClient) IsObjNotFoundErr(err error) bool { - return b.prefixedBucket.IsObjNotFoundErr(err) -} - -// Attributes returns attributes of the specified object. -func (b *UserBucketReaderClient) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) { - return b.prefixedBucket.Attributes(ctx, name) -} - -// ReaderWithExpectedErrs allows to specify a filter that marks certain errors as expected, so it will not increment -// thanos_objstore_bucket_operation_failures_total metric. -func (b *UserBucketReaderClient) ReaderWithExpectedErrs(fn objstore.IsOpFailureExpectedFunc) objstore.BucketReader { - if ib, ok := b.prefixedBucket.(objstore.InstrumentedBucketReader); ok { - return &UserBucketReaderClient{ - userID: b.userID, - prefixedBucket: ib.ReaderWithExpectedErrs(fn), - } - } - - return b -} - -// WithExpectedErrs allows to specify a filter that marks certain errors as expected, so it will not increment -// thanos_objstore_bucket_operation_failures_total metric. -func (b *UserBucketClient) WithExpectedErrs(fn objstore.IsOpFailureExpectedFunc) objstore.Bucket { - if ib, ok := b.prefixedBucket.(objstore.InstrumentedBucket); ok { - nb := ib.WithExpectedErrs(fn) - - return &UserBucketClient{ - UserBucketReaderClient: UserBucketReaderClient{ - userID: b.userID, - prefixedBucket: nb, - }, - prefixedBucket: nb, - } - } - - return b + // Inject the SSE config. + return NewSSEBucketClient(userID, bucket, cfgProvider) } diff --git a/pkg/storage/bucket/user_bucket_client_test.go b/pkg/storage/bucket/user_bucket_client_test.go deleted file mode 100644 index 25b173de3f..0000000000 --- a/pkg/storage/bucket/user_bucket_client_test.go +++ /dev/null @@ -1,102 +0,0 @@ -package bucket - -import ( - "context" - "encoding/base64" - "net/http" - "net/http/httptest" - "strings" - "testing" - - "github.com/go-kit/kit/log" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/cortexproject/cortex/pkg/storage/bucket/s3" - "github.com/cortexproject/cortex/pkg/util/flagext" -) - -func TestUserBucketClient_Upload_ShouldInjectCustomSSEConfig(t *testing.T) { - const ( - kmsKeyID = "ABC" - kmsEncryptionContext = "{\"department\":\"10103.0\"}" - ) - - var req *http.Request - - // Start a fake HTTP server which simulate S3. - srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // Keep track of the received request. - req = r - - w.WriteHeader(http.StatusOK) - })) - defer srv.Close() - - s3Cfg := s3.Config{ - Endpoint: srv.Listener.Addr().String(), - Region: "test", - BucketName: "test-bucket", - SecretAccessKey: flagext.Secret{Value: "test"}, - AccessKeyID: "test", - Insecure: true, - } - - s3Client, err := s3.NewBucketClient(s3Cfg, "test", log.NewNopLogger()) - require.NoError(t, err) - - // Configure the config provider with NO KMS key ID. - cfgProvider := &mockTenantConfigProvider{} - userBkt := NewUserBucketClient("user-1", s3Client, cfgProvider) - - err = userBkt.Upload(context.Background(), "test", strings.NewReader("test")) - require.NoError(t, err) - - // Ensure NO KMS header has been injected. - assert.Equal(t, "", req.Header.Get("x-amz-server-side-encryption")) - assert.Equal(t, "", req.Header.Get("x-amz-server-side-encryption-aws-kms-key-id")) - assert.Equal(t, "", req.Header.Get("x-amz-server-side-encryption-context")) - - // Configure the config provider with a KMS key ID and without encryption context. - cfgProvider.s3SseType = s3.SSEKMS - cfgProvider.s3KmsKeyID = kmsKeyID - - err = userBkt.Upload(context.Background(), "test", strings.NewReader("test")) - require.NoError(t, err) - - // Ensure the KMS header has been injected. - assert.Equal(t, "aws:kms", req.Header.Get("x-amz-server-side-encryption")) - assert.Equal(t, kmsKeyID, req.Header.Get("x-amz-server-side-encryption-aws-kms-key-id")) - assert.Equal(t, "", req.Header.Get("x-amz-server-side-encryption-context")) - - // Configure the config provider with a KMS key ID and encryption context. - cfgProvider.s3SseType = s3.SSEKMS - cfgProvider.s3KmsKeyID = kmsKeyID - cfgProvider.s3KmsEncryptionContext = kmsEncryptionContext - - err = userBkt.Upload(context.Background(), "test", strings.NewReader("test")) - require.NoError(t, err) - - // Ensure the KMS header has been injected. - assert.Equal(t, "aws:kms", req.Header.Get("x-amz-server-side-encryption")) - assert.Equal(t, kmsKeyID, req.Header.Get("x-amz-server-side-encryption-aws-kms-key-id")) - assert.Equal(t, base64.StdEncoding.EncodeToString([]byte(kmsEncryptionContext)), req.Header.Get("x-amz-server-side-encryption-context")) -} - -type mockTenantConfigProvider struct { - s3SseType string - s3KmsKeyID string - s3KmsEncryptionContext string -} - -func (m *mockTenantConfigProvider) S3SSEType(_ string) string { - return m.s3SseType -} - -func (m *mockTenantConfigProvider) S3SSEKMSKeyID(_ string) string { - return m.s3KmsKeyID -} - -func (m *mockTenantConfigProvider) S3SSEKMSEncryptionContext(_ string) string { - return m.s3KmsEncryptionContext -} diff --git a/tools/blocksconvert/builder/builder.go b/tools/blocksconvert/builder/builder.go index 5677168e04..816e8613e4 100644 --- a/tools/blocksconvert/builder/builder.go +++ b/tools/blocksconvert/builder/builder.go @@ -251,7 +251,7 @@ func (p *builderProcessor) ProcessPlanEntries(ctx context.Context, planEntryCh c return ulid.String(), nil } -func uploadBlock(ctx context.Context, planLog log.Logger, userBucket *bucket.UserBucketClient, blockDir string) error { +func uploadBlock(ctx context.Context, planLog log.Logger, userBucket objstore.Bucket, blockDir string) error { boff := util.NewBackoff(ctx, util.BackoffConfig{ MinBackoff: 1 * time.Second, MaxBackoff: 5 * time.Second, diff --git a/tools/thanosconvert/thanosconvert.go b/tools/thanosconvert/thanosconvert.go index de0618b24d..1f6085a37b 100644 --- a/tools/thanosconvert/thanosconvert.go +++ b/tools/thanosconvert/thanosconvert.go @@ -128,7 +128,7 @@ func (c ThanosBlockConverter) convertUser(ctx context.Context, user string) (Per return results, nil } -func (c *ThanosBlockConverter) uploadNewMeta(ctx context.Context, userBucketClient *bucket.UserBucketClient, blockID string, meta metadata.Meta) error { +func (c *ThanosBlockConverter) uploadNewMeta(ctx context.Context, userBucketClient objstore.Bucket, blockID string, meta metadata.Meta) error { var body bytes.Buffer if err := meta.Write(&body); err != nil { return errors.Wrap(err, "encode json")