Skip to content

Commit

Permalink
Fix #4885: Use multipart upload instead of CopyObject for touching fi…
Browse files Browse the repository at this point in the history
…le > 5GB

Signed-off-by: Bertrand Paquet <bertrand.paquet@gmail.com>
  • Loading branch information
bpaquet committed Aug 21, 2024
1 parent 49f3d8f commit 6c40a3e
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 15 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,7 @@ Other options are:
* `name=<manifest>`: specify name of the manifest to use (default `buildkit`)
* Multiple manifest names can be specified at the same time, separated by `;`. The standard use case is to use the git sha1 as name, and the branch name as duplicate, and load both with 2 `import-cache` commands.
* `ignore-error=<false|true>`: specify if error is ignored in case cache export fails (default: `false`)
* `touch_refresh=24h`: Instead of being uploaded again when not changed, blobs files will be "touched" on s3 every `touch_refresh`, default is 24h. Due to this, an expiration policy can be set on the S3 bucket to cleanup useless files automatically. Manifests files are systematically rewritten, there is no need to touch them.

`--import-cache` options:
* `type=s3`
Expand Down
110 changes: 95 additions & 15 deletions cache/remotecache/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/labels"
Expand Down Expand Up @@ -44,6 +45,7 @@ const (
attrSecretAccessKey = "secret_access_key"
attrSessionToken = "session_token"
attrUsePathStyle = "use_path_style"
maxCopyObjectSize = 5 * 1024 * 1024 * 1024
)

type Config struct {
Expand Down Expand Up @@ -203,13 +205,13 @@ func (e *exporter) Finalize(ctx context.Context) (map[string]string, error) {
}

key := e.s3Client.blobKey(dgstPair.Descriptor.Digest)
exists, err := e.s3Client.exists(ctx, key)
exists, size, err := e.s3Client.exists(ctx, key)
if err != nil {
return nil, errors.Wrapf(err, "failed to check file presence in cache")
}
if exists != nil {
if time.Since(*exists) > e.config.TouchRefresh {
err = e.s3Client.touch(ctx, key)
err = e.s3Client.touch(ctx, key, size)
if err != nil {
return nil, errors.Wrapf(err, "failed to touch file")
}
Expand Down Expand Up @@ -449,7 +451,7 @@ func (s3Client *s3Client) saveMutableAt(ctx context.Context, key string, body io
return err
}

func (s3Client *s3Client) exists(ctx context.Context, key string) (*time.Time, error) {
func (s3Client *s3Client) exists(ctx context.Context, key string) (*time.Time, *int64, error) {
input := &s3.HeadObjectInput{
Bucket: &s3Client.bucket,
Key: &key,
Expand All @@ -458,26 +460,104 @@ func (s3Client *s3Client) exists(ctx context.Context, key string) (*time.Time, e
head, err := s3Client.HeadObject(ctx, input)
if err != nil {
if isNotFound(err) {
return nil, nil
return nil, nil, nil
}
return nil, err
return nil, nil, err
}
return head.LastModified, head.ContentLength, nil
}

func buildCopySourceRange(start int64, objectSize int64) string {
end := start + maxCopyObjectSize - 1
if end > objectSize {
end = objectSize - 1
}
return head.LastModified, nil
startRange := strconv.FormatInt(start, 10)
stopRange := strconv.FormatInt(end, 10)
return "bytes=" + startRange + "-" + stopRange
}

func (s3Client *s3Client) touch(ctx context.Context, key string) error {
func (s3Client *s3Client) touch(ctx context.Context, key string, size *int64) (err error) {
copySource := fmt.Sprintf("%s/%s", s3Client.bucket, key)
cp := &s3.CopyObjectInput{
Bucket: &s3Client.bucket,
CopySource: &copySource,
Key: &key,
Metadata: map[string]string{"updated-at": time.Now().String()},
MetadataDirective: "REPLACE",

// CopyObject does not support files > 5GB
if *size < maxCopyObjectSize {
cp := &s3.CopyObjectInput{
Bucket: &s3Client.bucket,
CopySource: &copySource,
Key: &key,
Metadata: map[string]string{"updated-at": time.Now().String()},
MetadataDirective: "REPLACE",
}

_, err := s3Client.CopyObject(ctx, cp)

return err
}
input := &s3.CreateMultipartUploadInput{
Bucket: &s3Client.bucket,
Key: &key,
}

_, err := s3Client.CopyObject(ctx, cp)
output, err := s3Client.CreateMultipartUpload(ctx, input)
if err != nil {
return err
}

return err
defer func() {
abortIn := s3.AbortMultipartUploadInput{
Bucket: &s3Client.bucket,
Key: &key,
UploadId: output.UploadId,
}
if err != nil {
s3Client.AbortMultipartUpload(ctx, &abortIn)
}
}()

var currentPartNumber int32 = 1
var currentPosition int64
var completedParts []types.CompletedPart

for currentPosition < *size {
copyRange := buildCopySourceRange(currentPosition, *size)
partInput := s3.UploadPartCopyInput{
Bucket: &s3Client.bucket,
CopySource: &copySource,
CopySourceRange: &copyRange,
Key: &key,
PartNumber: &currentPartNumber,
UploadId: output.UploadId,
}
uploadPartCopyResult, err := s3Client.UploadPartCopy(ctx, &partInput)
if err != nil {
return err
}
partNumber := new(int32)
*partNumber = currentPartNumber
completedParts = append(completedParts, types.CompletedPart{
ETag: uploadPartCopyResult.CopyPartResult.ETag,
PartNumber: partNumber,
})

currentPartNumber++
currentPosition += maxCopyObjectSize
}

completeMultipartUploadInput := &s3.CompleteMultipartUploadInput{
Bucket: &s3Client.bucket,
Key: &key,
UploadId: output.UploadId,
MultipartUpload: &types.CompletedMultipartUpload{
Parts: completedParts,
},
}

if _, err := s3Client.CompleteMultipartUpload(ctx, completeMultipartUploadInput); err != nil {
return err
}

return nil
}

func (s3Client *s3Client) ReaderAt(ctx context.Context, desc ocispecs.Descriptor) (content.ReaderAt, error) {
Expand Down

0 comments on commit 6c40a3e

Please sign in to comment.