Skip to content

Commit

Permalink
service/s3/s3manager: Use sync.Pool for reuse of part buffers for str…
Browse files Browse the repository at this point in the history
…eaming payloads
  • Loading branch information
skmcgrail committed Oct 1, 2019
1 parent d592c40 commit f6eaebe
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 19 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ SDK Enhancements
* Related to [aws/aws-sdk-go#2310](https://github.com/aws/aws-sdk-go/pull/2310)
* Fixes [#251](https://github.com/aws/aws-sdk-go-v2/issues/251)
* `aws/request` : Retryer is now a named field on Request. ([#393](https://github.com/aws/aws-sdk-go-v2/pull/393))

* `service/s3/s3manager`: Adds a `sync.Pool` to Uploader for reuse of part `[]byte` when uploading streaming body payloads
* Fixes [#402](https://github.com/aws/aws-sdk-go-v2/issues/402)
* Uses the new behavior introduced in V1 [#2863](https://github.com/aws/aws-sdk-go/pull/2863) which allows the reuse of the sync.Pool across multiple Upload request that match part sizes.

SDK Bugs
---
2 changes: 1 addition & 1 deletion service/s3/s3manager/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ func (d *downloader) downloadChunk(chunk dlchunk) error {
// Check if the returned error is an errReadingBody.
// If err is errReadingBody this indicates that an error
// occurred while copying the http response body.
// If this occurs we unwrap the error to set the underling error
// If this occurs we unwrap the error to set the underlying error
// and attempt any remaining retries.
if bodyErr, ok := err.(*errReadingBody); ok {
err = bodyErr.Unwrap()
Expand Down
54 changes: 37 additions & 17 deletions service/s3/s3manager/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ type Uploader struct {

// Defines the buffer strategy used when uploading a part
BufferProvider ReadSeekerWriteToProvider

// partPool allows for the re-usage of streaming payload part buffers between upload calls
partPool *partPool
}

// NewUploader creates a new Uploader instance to upload objects to S3. Pass In
Expand All @@ -179,8 +182,12 @@ type Uploader struct {
// u.PartSize = 64 * 1024 * 1024 // 64MB per part
// })
func NewUploader(cfg aws.Config, options ...func(*Uploader)) *Uploader {
return newUploader(s3.New(cfg), options...)
}

func newUploader(client s3iface.ClientAPI, options ...func(*Uploader)) *Uploader {
u := &Uploader{
S3: s3.New(cfg),
S3: client,
PartSize: DefaultUploadPartSize,
Concurrency: DefaultUploadConcurrency,
LeavePartsOnError: false,
Expand All @@ -192,6 +199,8 @@ func NewUploader(cfg aws.Config, options ...func(*Uploader)) *Uploader {
option(u)
}

u.partPool = newPartPool(u.PartSize)

return u
}

Expand All @@ -214,20 +223,7 @@ func NewUploader(cfg aws.Config, options ...func(*Uploader)) *Uploader {
// u.PartSize = 64 * 1024 * 1024 // 64MB per part
// })
func NewUploaderWithClient(svc s3iface.ClientAPI, options ...func(*Uploader)) *Uploader {
u := &Uploader{
S3: svc,
PartSize: DefaultUploadPartSize,
Concurrency: DefaultUploadConcurrency,
LeavePartsOnError: false,
MaxUploadParts: MaxUploadParts,
BufferProvider: defaultUploadBufferProvider(),
}

for _, option := range options {
option(u)
}

return u
return newUploader(svc, options...)
}

// Upload uploads an object to S3, intelligently buffering large files into
Expand Down Expand Up @@ -287,6 +283,13 @@ func (u Uploader) UploadWithContext(ctx context.Context, input *UploadInput, opt
for _, opt := range opts {
opt(&i.cfg)
}

// If PartSize was changed or partPool was never setup then we need to allocated a new pool
// so that we return []byte slices of the correct size
if i.cfg.partPool == nil || i.cfg.partPool.partSize != i.cfg.PartSize {
i.cfg.partPool = newPartPool(i.cfg.PartSize)
}

i.cfg.RequestOptions = append(i.cfg.RequestOptions, request.WithAppendUserAgent("S3Manager"))

return i.upload()
Expand Down Expand Up @@ -460,11 +463,13 @@ func (u *uploader) nextReader() (io.ReadSeeker, int, func(), error) {
return reader, int(n), cleanup, err

default:
part := make([]byte, u.cfg.PartSize)
part := u.cfg.partPool.Get().([]byte)
n, err := readFillBuf(r, part)
u.readerPos += int64(n)

cleanup := func() {}
cleanup := func() {
u.cfg.partPool.Put(part)
}

return bytes.NewReader(part[0:n]), n, cleanup, err
}
Expand Down Expand Up @@ -751,3 +756,18 @@ func (u *multiuploader) complete() *s3.CompleteMultipartUploadOutput {

return resp.CompleteMultipartUploadOutput
}

type partPool struct {
partSize int64
sync.Pool
}

func newPartPool(partSize int64) *partPool {
p := &partPool{partSize: partSize}

p.New = func() interface{} {
return make([]byte, p.partSize)
}

return p
}

0 comments on commit f6eaebe

Please sign in to comment.