From 74289cfde798d538770bf2ed15f87a1394e6ab1c Mon Sep 17 00:00:00 2001 From: Bertrand Paquet Date: Tue, 20 Aug 2024 17:04:15 +0200 Subject: [PATCH] Fix #4885: Use multipart upload instead of CopyObject for touching file > 5GB Signed-off-by: Bertrand Paquet --- cache/remotecache/s3/s3.go | 108 +++++++++++++++++++++++++++++++------ 1 file changed, 93 insertions(+), 15 deletions(-) diff --git a/cache/remotecache/s3/s3.go b/cache/remotecache/s3/s3.go index d994c33c533b8..0aa9ea62523c6 100644 --- a/cache/remotecache/s3/s3.go +++ b/cache/remotecache/s3/s3.go @@ -16,6 +16,7 @@ import ( "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/feature/s3/manager" "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" "github.com/containerd/containerd/content" "github.com/containerd/containerd/labels" @@ -44,6 +45,7 @@ const ( attrSecretAccessKey = "secret_access_key" attrSessionToken = "session_token" attrUsePathStyle = "use_path_style" + maxCopyObjectSize = 5 * 1024 * 1024 * 1024 ) type Config struct { @@ -203,13 +205,13 @@ func (e *exporter) Finalize(ctx context.Context) (map[string]string, error) { } key := e.s3Client.blobKey(dgstPair.Descriptor.Digest) - exists, err := e.s3Client.exists(ctx, key) + exists, size, err := e.s3Client.exists(ctx, key) if err != nil { return nil, errors.Wrapf(err, "failed to check file presence in cache") } if exists != nil { if time.Since(*exists) > e.config.TouchRefresh { - err = e.s3Client.touch(ctx, key) + err = e.s3Client.touch(ctx, key, size) if err != nil { return nil, errors.Wrapf(err, "failed to touch file") } @@ -449,7 +451,7 @@ func (s3Client *s3Client) saveMutableAt(ctx context.Context, key string, body io return err } -func (s3Client *s3Client) exists(ctx context.Context, key string) (*time.Time, error) { +func (s3Client *s3Client) exists(ctx context.Context, key string) (*time.Time, *int64, error) { input := &s3.HeadObjectInput{ Bucket: &s3Client.bucket, Key: &key, @@ -458,26 +460,102 @@ func (s3Client *s3Client) exists(ctx context.Context, key string) (*time.Time, e head, err := s3Client.HeadObject(ctx, input) if err != nil { if isNotFound(err) { - return nil, nil + return nil, nil, nil } - return nil, err + return nil, nil, err + } + return head.LastModified, head.ContentLength, nil +} + +func buildCopySourceRange(start int64, objectSize int64) string { + end := start + maxCopyObjectSize - 1 + if end > objectSize { + end = objectSize - 1 } - return head.LastModified, nil + startRange := strconv.FormatInt(start, 10) + stopRange := strconv.FormatInt(end, 10) + return "bytes=" + startRange + "-" + stopRange } -func (s3Client *s3Client) touch(ctx context.Context, key string) error { +func (s3Client *s3Client) touch(ctx context.Context, key string, size *int64) error { copySource := fmt.Sprintf("%s/%s", s3Client.bucket, key) - cp := &s3.CopyObjectInput{ - Bucket: &s3Client.bucket, - CopySource: ©Source, - Key: &key, - Metadata: map[string]string{"updated-at": time.Now().String()}, - MetadataDirective: "REPLACE", + + // CopyObject does not support files > 5GB + if *size < maxCopyObjectSize { + cp := &s3.CopyObjectInput{ + Bucket: &s3Client.bucket, + CopySource: ©Source, + Key: &key, + Metadata: map[string]string{"updated-at": time.Now().String()}, + MetadataDirective: "REPLACE", + } + + _, err := s3Client.CopyObject(ctx, cp) + + return err + } + input := &s3.CreateMultipartUploadInput{ + Bucket: &s3Client.bucket, + Key: &key, } - _, err := s3Client.CopyObject(ctx, cp) + output, err := s3Client.CreateMultipartUpload(ctx, input) + if err != nil { + return err + } + + var currentPartNumber int32 = 1 + var currentPosition int64 + var completedParts []types.CompletedPart + + for currentPosition < *size { + copyRange := buildCopySourceRange(currentPosition, *size) + partInput := s3.UploadPartCopyInput{ + Bucket: &s3Client.bucket, + CopySource: ©Source, + CopySourceRange: ©Range, + Key: &key, + PartNumber: ¤tPartNumber, + UploadId: output.UploadId, + } + uploadPartCopyResult, err := s3Client.UploadPartCopy(ctx, &partInput) + if err != nil { + abortIn := s3.AbortMultipartUploadInput{ + Bucket: &s3Client.bucket, + Key: &key, + UploadId: output.UploadId, + } + _, errAbort := s3Client.AbortMultipartUpload(ctx, &abortIn) + if errAbort != nil { + return errAbort + } + return err + } + partNumber := new(int32) + *partNumber = currentPartNumber + completedParts = append(completedParts, types.CompletedPart{ + ETag: uploadPartCopyResult.CopyPartResult.ETag, + PartNumber: partNumber, + }) + + currentPartNumber++ + currentPosition += maxCopyObjectSize + } + + CompleteMultipartUploadInput := &s3.CompleteMultipartUploadInput{ + Bucket: &s3Client.bucket, + Key: &key, + UploadId: output.UploadId, + MultipartUpload: &types.CompletedMultipartUpload{ + Parts: completedParts, + }, + } + _, completeErr := s3Client.CompleteMultipartUpload(ctx, CompleteMultipartUploadInput) + if completeErr != nil { + return completeErr + } - return err + return nil } func (s3Client *s3Client) ReaderAt(ctx context.Context, desc ocispecs.Descriptor) (content.ReaderAt, error) {