Skip to content

Commit

Permalink
s3Store: Concurrently write upload parts to S3 while reading from cli…
Browse files Browse the repository at this point in the history
…ent (tus#402)

* Allow empty metadata values

* Make tests less fragile by allowing loose call ordering

* Add s3ChunkProducer

* Integrate s3ChunkProducer to support chunk buffering

* Remove completed chunk files inline to reduce disk space usage

* Add tests for chunk producer

* docs: Use value from Host header to forward to tusd

* Use int64 for MaxBufferedParts field

* Default to 20 buffered parts

* Rename s3ChunkProducer -> s3PartProducer

* Document s3PartProducer struct

* Clarify misleading comment

* Revert "Remove completed chunk files inline to reduce disk space usage"

This reverts commit b72a4d4.

* Remove redundant seek

This is already being done in s3PartProducer.

* Clean up any remaining files in the channel when we return

* Make putPart* functions responsible for cleaning up temp files

* handler: Add tests for empty metadata pairs

* Factor out cleanUpTempFile func

* Add test to ensure that temporary files get cleaned up

Co-authored-by: Jens Steinhauser <jens.steinhauser@gmail.com>
Co-authored-by: Marius <marius@transloadit.com>
  • Loading branch information
3 people authored and justinruggles committed Oct 26, 2020
1 parent cc74339 commit f8ab542
Show file tree
Hide file tree
Showing 3 changed files with 563 additions and 300 deletions.
177 changes: 131 additions & 46 deletions pkg/s3store/s3store.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,15 @@ type S3Store struct {
// MaxObjectSize is the maximum size an S3 Object can have according to S3
// API specifications. See link above.
MaxObjectSize int64
// MaxBufferedParts is the number of additional parts that can be received from
// the client and stored on disk while a part is being uploaded to S3. This
// can help improve throughput by not blocking the client while tusd is
// communicating with the S3 API, which can have unpredictable latency.
MaxBufferedParts int64
// TemporaryDirectory is the path where S3Store will create temporary files
// on disk during the upload. An empty string ("", the default value) will
// cause S3Store to use the operating system's default temporary directory.
TemporaryDirectory string
}

type S3API interface {
Expand All @@ -153,12 +162,14 @@ type S3API interface {
// New constructs a new storage using the supplied bucket and service object.
func New(bucket string, service S3API) S3Store {
return S3Store{
Bucket: bucket,
Service: service,
MaxPartSize: 5 * 1024 * 1024 * 1024,
MinPartSize: 5 * 1024 * 1024,
MaxMultipartParts: 10000,
MaxObjectSize: 5 * 1024 * 1024 * 1024 * 1024,
Bucket: bucket,
Service: service,
MaxPartSize: 5 * 1024 * 1024 * 1024,
MinPartSize: 5 * 1024 * 1024,
MaxMultipartParts: 10000,
MaxObjectSize: 5 * 1024 * 1024 * 1024 * 1024,
MaxBufferedParts: 20,
TemporaryDirectory: "",
}
}

Expand Down Expand Up @@ -272,6 +283,73 @@ func (upload *s3Upload) writeInfo(ctx context.Context, info handler.FileInfo) er
return err
}

// s3PartProducer converts a stream of bytes from the reader into a stream of files on disk
type s3PartProducer struct {
store *S3Store
files chan<- *os.File
done chan struct{}
err error
r io.Reader
}

func (spp *s3PartProducer) produce(partSize int64) {
for {
file, err := spp.nextPart(partSize)
if err != nil {
spp.err = err
close(spp.files)
return
}
if file == nil {
close(spp.files)
return
}
select {
case spp.files <- file:
case <-spp.done:
close(spp.files)
return
}
}
}

func (spp *s3PartProducer) nextPart(size int64) (*os.File, error) {
// Create a temporary file to store the part
file, err := ioutil.TempFile(spp.store.TemporaryDirectory, "tusd-s3-tmp-")
if err != nil {
return nil, err
}

limitedReader := io.LimitReader(spp.r, size)
n, err := io.Copy(file, limitedReader)

// If the HTTP PATCH request gets interrupted in the middle (e.g. because
// the user wants to pause the upload), Go's net/http returns an io.ErrUnexpectedEOF.
// However, for S3Store it's not important whether the stream has ended
// on purpose or accidentally. Therefore, we ignore this error to not
// prevent the remaining chunk to be stored on S3.
if err == io.ErrUnexpectedEOF {
err = nil
}

if err != nil {
return nil, err
}

// If the entire request body is read and no more data is available,
// io.Copy returns 0 since it is unable to read any bytes. In that
// case, we can close the s3PartProducer.
if n == 0 {
cleanUpTempFile(file)
return nil, nil
}

// Seek to the beginning of the file
file.Seek(0, 0)

return file, nil
}

func (upload s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Reader) (int64, error) {
id := upload.id
store := upload.store
Expand Down Expand Up @@ -305,8 +383,7 @@ func (upload s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Read
return 0, err
}
if incompletePartFile != nil {
defer os.Remove(incompletePartFile.Name())
defer incompletePartFile.Close()
defer cleanUpTempFile(incompletePartFile)

if err := store.deleteIncompletePartForUpload(ctx, uploadId); err != nil {
return 0, err
Expand All @@ -315,49 +392,43 @@ func (upload s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Read
src = io.MultiReader(incompletePartFile, src)
}

for {
// Create a temporary file to store the part in it
file, err := ioutil.TempFile("", "tusd-s3-tmp-")
if err != nil {
return bytesUploaded, err
}
defer os.Remove(file.Name())
defer file.Close()

limitedReader := io.LimitReader(src, optimalPartSize)
n, err := io.Copy(file, limitedReader)

// If the HTTP PATCH request gets interrupted in the middle (e.g. because
// the user wants to pause the upload), Go's net/http returns an io.ErrUnexpectedEOF.
// However, for S3Store it's not important whether the stream has ended
// on purpose or accidentally. Therefore, we ignore this error to not
// prevent the remaining chunk to be stored on S3.
if err == io.ErrUnexpectedEOF {
err = nil
fileChan := make(chan *os.File, store.MaxBufferedParts)
doneChan := make(chan struct{})
defer close(doneChan)

// If we panic or return while there are still files in the channel, then
// we may leak file descriptors. Let's ensure that those are cleaned up.
defer func() {
for file := range fileChan {
cleanUpTempFile(file)
}
}()

partProducer := s3PartProducer{
store: store,
done: doneChan,
files: fileChan,
r: src,
}
go partProducer.produce(optimalPartSize)

// io.Copy does not return io.EOF, so we not have to handle it differently.
for file := range fileChan {
stat, err := file.Stat()
if err != nil {
return bytesUploaded, err
}
// If io.Copy is finished reading, it will always return (0, nil).
if n == 0 {
return (bytesUploaded - incompletePartSize), nil
return 0, err
}

// Seek to the beginning of the file
file.Seek(0, 0)
n := stat.Size()

isFinalChunk := !info.SizeIsDeferred && (size == (offset-incompletePartSize)+n)
if n >= store.MinPartSize || isFinalChunk {
_, err = store.Service.UploadPartWithContext(ctx, &s3.UploadPartInput{
uploadPartInput := &s3.UploadPartInput{
Bucket: aws.String(store.Bucket),
Key: store.keyWithPrefix(uploadId),
UploadId: aws.String(multipartId),
PartNumber: aws.Int64(nextPartNum),
Body: file,
})
if err != nil {
}
if err := upload.putPartForUpload(ctx, uploadPartInput, file); err != nil {
return bytesUploaded, err
}
} else {
Expand All @@ -374,6 +445,20 @@ func (upload s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Read
bytesUploaded += n
nextPartNum += 1
}

return bytesUploaded - incompletePartSize, partProducer.err
}

func cleanUpTempFile(file *os.File) {
file.Close()
os.Remove(file.Name())
}

func (upload *s3Upload) putPartForUpload(ctx context.Context, uploadPartInput *s3.UploadPartInput, file *os.File) error {
defer cleanUpTempFile(file)

_, err := upload.store.Service.UploadPartWithContext(ctx, uploadPartInput)
return err
}

func (upload *s3Upload) GetInfo(ctx context.Context) (info handler.FileInfo, err error) {
Expand Down Expand Up @@ -643,13 +728,11 @@ func (upload *s3Upload) concatUsingDownload(ctx context.Context, partialUploads
uploadId, multipartId := splitIds(id)

// Create a temporary file for holding the concatenated data
file, err := ioutil.TempFile("", "tusd-s3-concat-tmp-")
file, err := ioutil.TempFile(store.TemporaryDirectory, "tusd-s3-concat-tmp-")
if err != nil {
return err
}
fmt.Println(file.Name())
defer os.Remove(file.Name())
defer file.Close()
defer cleanUpTempFile(file)

// Download each part and append it to the temporary file
for _, partialUpload := range partialUploads {
Expand Down Expand Up @@ -790,7 +873,7 @@ func (store S3Store) downloadIncompletePartForUpload(ctx context.Context, upload
}
defer incompleteUploadObject.Body.Close()

partFile, err := ioutil.TempFile("", "tusd-s3-tmp-")
partFile, err := ioutil.TempFile(store.TemporaryDirectory, "tusd-s3-tmp-")
if err != nil {
return nil, 0, err
}
Expand Down Expand Up @@ -824,11 +907,13 @@ func (store S3Store) getIncompletePartForUpload(ctx context.Context, uploadId st
return obj, err
}

func (store S3Store) putIncompletePartForUpload(ctx context.Context, uploadId string, r io.ReadSeeker) error {
func (store S3Store) putIncompletePartForUpload(ctx context.Context, uploadId string, file *os.File) error {
defer cleanUpTempFile(file)

_, err := store.Service.PutObjectWithContext(ctx, &s3.PutObjectInput{
Bucket: aws.String(store.Bucket),
Key: store.metadataKeyWithPrefix(uploadId + ".part"),
Body: r,
Body: file,
})
return err
}
Expand Down
Loading

0 comments on commit f8ab542

Please sign in to comment.