From fe72a52350a8962175bb71c531ec9724ce48abd8 Mon Sep 17 00:00:00 2001 From: Sean McGrail Date: Tue, 1 Oct 2019 14:58:25 -0700 Subject: [PATCH] service/s3/s3manager: Move part buffer pool upwards to allow reuse. (#2863) --- CHANGELOG_PENDING.md | 2 ++ service/s3/s3manager/upload.go | 33 +++++++++++++++++++++++++++------ 2 files changed, 29 insertions(+), 6 deletions(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 150bde9e891..eef6be63903 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -1,6 +1,8 @@ ### SDK Features ### SDK Enhancements +* `service/s3/s3manager`: Allow reuse of Uploader buffer `sync.Pool` amongst multiple Upload calls ([#2863](https://github.com/aws/aws-sdk-go/pull/2863)) + * The `sync.Pool` used for the reuse of `[]byte` slices when handling streaming payloads will now be shared across multiple Upload calls when the upload part size remains constant. ### SDK Bugs * `internal/ini`: Fix ini parser to handle empty values [#2860](https://github.com/aws/aws-sdk-go/pull/2860) diff --git a/service/s3/s3manager/upload.go b/service/s3/s3manager/upload.go index 799b79f18a5..8debfcd026c 100644 --- a/service/s3/s3manager/upload.go +++ b/service/s3/s3manager/upload.go @@ -165,6 +165,9 @@ type Uploader struct { // Defines the buffer strategy used when uploading a part BufferProvider ReadSeekerWriteToProvider + + // partPool allows for the re-usage of streaming payload part buffers between upload calls + partPool *partPool } // NewUploader creates a new Uploader instance to upload objects to S3. Pass In @@ -201,6 +204,8 @@ func newUploader(client s3iface.S3API, options ...func(*Uploader)) *Uploader { option(u) } + u.partPool = newPartPool(u.PartSize) + return u } @@ -283,6 +288,7 @@ func (u Uploader) UploadWithContext(ctx aws.Context, input *UploadInput, opts .. for _, opt := range opts { opt(&i.cfg) } + i.cfg.RequestOptions = append(i.cfg.RequestOptions, request.WithAppendUserAgent("S3Manager")) return i.upload() @@ -352,8 +358,6 @@ type uploader struct { readerPos int64 // current reader position totalSize int64 // set to -1 if the size is not known - - bufferPool sync.Pool } // internal logic for deciding whether to upload a single part or use a @@ -393,8 +397,10 @@ func (u *uploader) init() error { u.cfg.MaxUploadParts = MaxUploadParts } - u.bufferPool = sync.Pool{ - New: func() interface{} { return make([]byte, u.cfg.PartSize) }, + // If PartSize was changed or partPool was never setup then we need to allocated a new pool + // so that we return []byte slices of the correct size + if u.cfg.partPool == nil || u.cfg.partPool.partSize != u.cfg.PartSize { + u.cfg.partPool = newPartPool(u.cfg.PartSize) } // Try to get the total size for some optimizations @@ -466,12 +472,12 @@ func (u *uploader) nextReader() (io.ReadSeeker, int, func(), error) { return reader, int(n), cleanup, err default: - part := u.bufferPool.Get().([]byte) + part := u.cfg.partPool.Get().([]byte) n, err := readFillBuf(r, part) u.readerPos += int64(n) cleanup := func() { - u.bufferPool.Put(part) + u.cfg.partPool.Put(part) } return bytes.NewReader(part[0:n]), n, cleanup, err @@ -751,3 +757,18 @@ func (u *multiuploader) complete() *s3.CompleteMultipartUploadOutput { return resp } + +type partPool struct { + partSize int64 + sync.Pool +} + +func newPartPool(partSize int64) *partPool { + p := &partPool{partSize: partSize} + + p.New = func() interface{} { + return make([]byte, p.partSize) + } + + return p +}