From f6eaebe32f03c639f52f110f0a74737a6d3f6b96 Mon Sep 17 00:00:00 2001 From: Sean McGrail Date: Thu, 26 Sep 2019 15:07:15 -0700 Subject: [PATCH] service/s3/s3manager: Use sync.Pool for reuse of part buffers for streaming payloads --- CHANGELOG_PENDING.md | 5 ++- service/s3/s3manager/download.go | 2 +- service/s3/s3manager/upload.go | 54 ++++++++++++++++++++++---------- 3 files changed, 42 insertions(+), 19 deletions(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 453f8a5bae5..ce1dd229240 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -29,6 +29,9 @@ SDK Enhancements * Related to [aws/aws-sdk-go#2310](https://github.com/aws/aws-sdk-go/pull/2310) * Fixes [#251](https://github.com/aws/aws-sdk-go-v2/issues/251) * `aws/request` : Retryer is now a named field on Request. ([#393](https://github.com/aws/aws-sdk-go-v2/pull/393)) - +* `service/s3/s3manager`: Adds a `sync.Pool` to Uploader for reuse of part `[]byte` when uploading streaming body payloads + * Fixes [#402](https://github.com/aws/aws-sdk-go-v2/issues/402) + * Uses the new behavior introduced in V1 [#2863](https://github.com/aws/aws-sdk-go/pull/2863) which allows the reuse of the sync.Pool across multiple Upload request that match part sizes. + SDK Bugs --- diff --git a/service/s3/s3manager/download.go b/service/s3/s3manager/download.go index a4943f73b08..749fb086a82 100644 --- a/service/s3/s3manager/download.go +++ b/service/s3/s3manager/download.go @@ -433,7 +433,7 @@ func (d *downloader) downloadChunk(chunk dlchunk) error { // Check if the returned error is an errReadingBody. // If err is errReadingBody this indicates that an error // occurred while copying the http response body. - // If this occurs we unwrap the error to set the underling error + // If this occurs we unwrap the error to set the underlying error // and attempt any remaining retries. if bodyErr, ok := err.(*errReadingBody); ok { err = bodyErr.Unwrap() diff --git a/service/s3/s3manager/upload.go b/service/s3/s3manager/upload.go index 30528fc257e..f6045b87a4e 100644 --- a/service/s3/s3manager/upload.go +++ b/service/s3/s3manager/upload.go @@ -162,6 +162,9 @@ type Uploader struct { // Defines the buffer strategy used when uploading a part BufferProvider ReadSeekerWriteToProvider + + // partPool allows for the re-usage of streaming payload part buffers between upload calls + partPool *partPool } // NewUploader creates a new Uploader instance to upload objects to S3. Pass In @@ -179,8 +182,12 @@ type Uploader struct { // u.PartSize = 64 * 1024 * 1024 // 64MB per part // }) func NewUploader(cfg aws.Config, options ...func(*Uploader)) *Uploader { + return newUploader(s3.New(cfg), options...) +} + +func newUploader(client s3iface.ClientAPI, options ...func(*Uploader)) *Uploader { u := &Uploader{ - S3: s3.New(cfg), + S3: client, PartSize: DefaultUploadPartSize, Concurrency: DefaultUploadConcurrency, LeavePartsOnError: false, @@ -192,6 +199,8 @@ func NewUploader(cfg aws.Config, options ...func(*Uploader)) *Uploader { option(u) } + u.partPool = newPartPool(u.PartSize) + return u } @@ -214,20 +223,7 @@ func NewUploader(cfg aws.Config, options ...func(*Uploader)) *Uploader { // u.PartSize = 64 * 1024 * 1024 // 64MB per part // }) func NewUploaderWithClient(svc s3iface.ClientAPI, options ...func(*Uploader)) *Uploader { - u := &Uploader{ - S3: svc, - PartSize: DefaultUploadPartSize, - Concurrency: DefaultUploadConcurrency, - LeavePartsOnError: false, - MaxUploadParts: MaxUploadParts, - BufferProvider: defaultUploadBufferProvider(), - } - - for _, option := range options { - option(u) - } - - return u + return newUploader(svc, options...) } // Upload uploads an object to S3, intelligently buffering large files into @@ -287,6 +283,13 @@ func (u Uploader) UploadWithContext(ctx context.Context, input *UploadInput, opt for _, opt := range opts { opt(&i.cfg) } + + // If PartSize was changed or partPool was never setup then we need to allocated a new pool + // so that we return []byte slices of the correct size + if i.cfg.partPool == nil || i.cfg.partPool.partSize != i.cfg.PartSize { + i.cfg.partPool = newPartPool(i.cfg.PartSize) + } + i.cfg.RequestOptions = append(i.cfg.RequestOptions, request.WithAppendUserAgent("S3Manager")) return i.upload() @@ -460,11 +463,13 @@ func (u *uploader) nextReader() (io.ReadSeeker, int, func(), error) { return reader, int(n), cleanup, err default: - part := make([]byte, u.cfg.PartSize) + part := u.cfg.partPool.Get().([]byte) n, err := readFillBuf(r, part) u.readerPos += int64(n) - cleanup := func() {} + cleanup := func() { + u.cfg.partPool.Put(part) + } return bytes.NewReader(part[0:n]), n, cleanup, err } @@ -751,3 +756,18 @@ func (u *multiuploader) complete() *s3.CompleteMultipartUploadOutput { return resp.CompleteMultipartUploadOutput } + +type partPool struct { + partSize int64 + sync.Pool +} + +func newPartPool(partSize int64) *partPool { + p := &partPool{partSize: partSize} + + p.New = func() interface{} { + return make([]byte, p.partSize) + } + + return p +}