Skip to content

Commit

Permalink
service/s3/s3manager: Use sync.Pool for reuse of part buffers for str…
Browse files Browse the repository at this point in the history
…eaming payloads
  • Loading branch information
skmcgrail committed Sep 26, 2019
1 parent eff3d22 commit a11b7a1
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 18 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ SDK Enhancements
* Related to [aws/aws-sdk-go#2310](https://github.com/aws/aws-sdk-go/pull/2310)
* Fixes [#251](https://github.com/aws/aws-sdk-go-v2/issues/251)
* `aws/request` : Retryer is now a named field on Request. ([#393](https://github.com/aws/aws-sdk-go-v2/pull/393))

* `service/s3/s3manager`: Adds a `sync.Pool` to Uploader for reuse of part `[]byte` when uploading streaming body payloads
* Fixes [#402](https://github.com/aws/aws-sdk-go-v2/issues/402)

SDK Bugs
---
33 changes: 16 additions & 17 deletions service/s3/s3manager/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,12 @@ type Uploader struct {
// u.PartSize = 64 * 1024 * 1024 // 64MB per part
// })
func NewUploader(cfg aws.Config, options ...func(*Uploader)) *Uploader {
return newUploader(s3.New(cfg), options...)
}

func newUploader(client s3iface.ClientAPI, options ...func(*Uploader)) *Uploader {
u := &Uploader{
S3: s3.New(cfg),
S3: client,
PartSize: DefaultUploadPartSize,
Concurrency: DefaultUploadConcurrency,
LeavePartsOnError: false,
Expand Down Expand Up @@ -214,20 +218,7 @@ func NewUploader(cfg aws.Config, options ...func(*Uploader)) *Uploader {
// u.PartSize = 64 * 1024 * 1024 // 64MB per part
// })
func NewUploaderWithClient(svc s3iface.ClientAPI, options ...func(*Uploader)) *Uploader {
u := &Uploader{
S3: svc,
PartSize: DefaultUploadPartSize,
Concurrency: DefaultUploadConcurrency,
LeavePartsOnError: false,
MaxUploadParts: MaxUploadParts,
BufferProvider: defaultUploadBufferProvider(),
}

for _, option := range options {
option(u)
}

return u
return newUploader(svc, options...)
}

// Upload uploads an object to S3, intelligently buffering large files into
Expand Down Expand Up @@ -356,6 +347,8 @@ type uploader struct {

readerPos int64 // current reader position
totalSize int64 // set to -1 if the size is not known

bufferPool sync.Pool
}

// internal logic for deciding whether to upload a single part or use a
Expand Down Expand Up @@ -392,6 +385,10 @@ func (u *uploader) init() error {
u.cfg.PartSize = DefaultUploadPartSize
}

u.bufferPool = sync.Pool{
New: func() interface{} { return make([]byte, u.cfg.PartSize) },
}

// Try to get the total size for some optimizations
return u.initSize()
}
Expand Down Expand Up @@ -460,11 +457,13 @@ func (u *uploader) nextReader() (io.ReadSeeker, int, func(), error) {
return reader, int(n), cleanup, err

default:
part := make([]byte, u.cfg.PartSize)
part := u.bufferPool.Get().([]byte)
n, err := readFillBuf(r, part)
u.readerPos += int64(n)

cleanup := func() {}
cleanup := func() {
u.bufferPool.Put(part)
}

return bytes.NewReader(part[0:n]), n, cleanup, err
}
Expand Down

0 comments on commit a11b7a1

Please sign in to comment.