Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3Store: Concurrently write upload parts to S3 while reading from client #402

Merged
merged 22 commits into from
Jul 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
87045a8
Allow empty metadata values
JenSte Jun 20, 2020
6c45ff2
Make tests less fragile by allowing loose call ordering
acj Feb 28, 2020
2b3fb49
Add s3ChunkProducer
acj Mar 16, 2020
54819d8
Integrate s3ChunkProducer to support chunk buffering
acj Mar 16, 2020
b72a4d4
Remove completed chunk files inline to reduce disk space usage
acj Mar 16, 2020
8d8046c
Add tests for chunk producer
acj Mar 20, 2020
ec5f500
docs: Use value from Host header to forward to tusd
Acconut Jun 24, 2020
08a72a5
Use int64 for MaxBufferedParts field
acj Jul 14, 2020
c51afa1
Default to 20 buffered parts
acj Jul 14, 2020
ff63ab4
Rename s3ChunkProducer -> s3PartProducer
acj Jul 14, 2020
46e0e9c
Document s3PartProducer struct
acj Jul 14, 2020
5dd7c3b
Clarify misleading comment
acj Jul 14, 2020
be6cf54
Revert "Remove completed chunk files inline to reduce disk space usage"
acj Jul 14, 2020
74c5c0c
Remove redundant seek
acj Jul 14, 2020
d014a3e
Clean up any remaining files in the channel when we return
acj Jul 14, 2020
9cb1385
Make putPart* functions responsible for cleaning up temp files
acj Jul 14, 2020
59c3d42
Merge branch 'metadata' of https://github.com/JenSte/tusd
Acconut Jul 15, 2020
26b84bc
handler: Add tests for empty metadata pairs
Acconut Jul 15, 2020
a23a1af
Merge branch 's3store-buffered-chunks' of https://github.com/acj/tusd…
Acconut Jul 18, 2020
b79c64f
Factor out cleanUpTempFile func
acj Jul 20, 2020
2cb30a4
Merge branch 's3store-buffered-chunks' of https://github.com/acj/tusd…
Acconut Jul 22, 2020
6984744
Add test to ensure that temporary files get cleaned up
Acconut Jul 22, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/nginx.conf
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ server {
proxy_http_version 1.1;

# Add X-Forwarded-* headers
proxy_set_header X-Forwarded-Host $hostname;
proxy_set_header X-Forwarded-Host $host;
proxy_set_header X-Forwarded-Proto $scheme;

proxy_set_header Upgrade $http_upgrade;
Expand Down
8 changes: 4 additions & 4 deletions pkg/handler/head_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ func TestHead(t *testing.T) {
Offset: 11,
Size: 44,
MetaData: map[string]string{
"name": "lunrjs.png",
"type": "image/png",
"name": "lunrjs.png",
"empty": "",
},
}, nil),
lock.EXPECT().Unlock().Return(nil),
Expand Down Expand Up @@ -57,8 +57,8 @@ func TestHead(t *testing.T) {

// Since the order of a map is not guaranteed in Go, we need to be prepared
// for the case, that the order of the metadata may have been changed
if v := res.Header().Get("Upload-Metadata"); v != "name bHVucmpzLnBuZw==,type aW1hZ2UvcG5n" &&
v != "type aW1hZ2UvcG5n,name bHVucmpzLnBuZw==" {
if v := res.Header().Get("Upload-Metadata"); v != "name bHVucmpzLnBuZw==,empty " &&
v != "empty ,name bHVucmpzLnBuZw==" {
t.Errorf("Expected valid metadata (got '%s')", v)
}
})
Expand Down
12 changes: 7 additions & 5 deletions pkg/handler/post_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,18 @@ func TestPost(t *testing.T) {
store.EXPECT().NewUpload(context.Background(), FileInfo{
Size: 300,
MetaData: map[string]string{
"foo": "hello",
"bar": "world",
"foo": "hello",
"bar": "world",
"empty": "",
},
}).Return(upload, nil),
upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{
ID: "foo",
Size: 300,
MetaData: map[string]string{
"foo": "hello",
"bar": "world",
"foo": "hello",
"bar": "world",
"empty": "",
},
}, nil),
)
Expand All @@ -52,7 +54,7 @@ func TestPost(t *testing.T) {
"Tus-Resumable": "1.0.0",
"Upload-Length": "300",
// Invalid Base64-encoded values should be ignored
"Upload-Metadata": "foo aGVsbG8=, bar d29ybGQ=, hah INVALID",
"Upload-Metadata": "foo aGVsbG8=, bar d29ybGQ=, hah INVALID, empty",
},
Code: http.StatusCreated,
ResHeader: map[string]string{
Expand Down
20 changes: 14 additions & 6 deletions pkg/handler/unrouted_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1120,19 +1120,27 @@ func ParseMetadataHeader(header string) map[string]string {

parts := strings.Split(element, " ")

// Do not continue with this element if no key and value or presented
if len(parts) != 2 {
if len(parts) > 2 {
continue
}

// Ignore corrent element if the value is no valid base64
key := parts[0]
value, err := base64.StdEncoding.DecodeString(parts[1])
if err != nil {
if key == "" {
continue
}

meta[key] = string(value)
value := ""
if len(parts) == 2 {
// Ignore current element if the value is no valid base64
dec, err := base64.StdEncoding.DecodeString(parts[1])
if err != nil {
continue
}

value = string(dec)
}

meta[key] = value
}

return meta
Expand Down
35 changes: 35 additions & 0 deletions pkg/handler/unrouted_handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package handler_test

import (
"testing"

"github.com/stretchr/testify/assert"

. "github.com/tus/tusd/pkg/handler"
)

func TestParseMetadataHeader(t *testing.T) {
a := assert.New(t)

md := ParseMetadataHeader("")
a.Equal(md, map[string]string{})

// Invalidly encoded values are ignored
md = ParseMetadataHeader("k1 INVALID")
a.Equal(md, map[string]string{})

// If the same key occurs multiple times, the last one wins
md = ParseMetadataHeader("k1 aGVsbG8=,k1 d29ybGQ=")
a.Equal(md, map[string]string{
"k1": "world",
})

// Empty values are mapped to an empty string
md = ParseMetadataHeader("k1 aGVsbG8=, k2, k3 , k4 d29ybGQ=")
a.Equal(md, map[string]string{
"k1": "hello",
"k2": "",
"k3": "",
"k4": "world",
})
}
177 changes: 131 additions & 46 deletions pkg/s3store/s3store.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,15 @@ type S3Store struct {
// MaxObjectSize is the maximum size an S3 Object can have according to S3
// API specifications. See link above.
MaxObjectSize int64
// MaxBufferedParts is the number of additional parts that can be received from
// the client and stored on disk while a part is being uploaded to S3. This
// can help improve throughput by not blocking the client while tusd is
// communicating with the S3 API, which can have unpredictable latency.
MaxBufferedParts int64
// TemporaryDirectory is the path where S3Store will create temporary files
// on disk during the upload. An empty string ("", the default value) will
// cause S3Store to use the operating system's default temporary directory.
TemporaryDirectory string
}

type S3API interface {
Expand All @@ -153,12 +162,14 @@ type S3API interface {
// New constructs a new storage using the supplied bucket and service object.
func New(bucket string, service S3API) S3Store {
return S3Store{
Bucket: bucket,
Service: service,
MaxPartSize: 5 * 1024 * 1024 * 1024,
MinPartSize: 5 * 1024 * 1024,
MaxMultipartParts: 10000,
MaxObjectSize: 5 * 1024 * 1024 * 1024 * 1024,
Bucket: bucket,
Service: service,
MaxPartSize: 5 * 1024 * 1024 * 1024,
MinPartSize: 5 * 1024 * 1024,
MaxMultipartParts: 10000,
MaxObjectSize: 5 * 1024 * 1024 * 1024 * 1024,
MaxBufferedParts: 20,
TemporaryDirectory: "",
}
}

Expand Down Expand Up @@ -272,6 +283,73 @@ func (upload *s3Upload) writeInfo(ctx context.Context, info handler.FileInfo) er
return err
}

// s3PartProducer converts a stream of bytes from the reader into a stream of files on disk
type s3PartProducer struct {
store *S3Store
files chan<- *os.File
done chan struct{}
err error
r io.Reader
}

func (spp *s3PartProducer) produce(partSize int64) {
for {
file, err := spp.nextPart(partSize)
if err != nil {
spp.err = err
close(spp.files)
return
}
if file == nil {
close(spp.files)
return
}
select {
case spp.files <- file:
case <-spp.done:
close(spp.files)
return
}
}
}

func (spp *s3PartProducer) nextPart(size int64) (*os.File, error) {
// Create a temporary file to store the part
file, err := ioutil.TempFile(spp.store.TemporaryDirectory, "tusd-s3-tmp-")
if err != nil {
return nil, err
}

limitedReader := io.LimitReader(spp.r, size)
n, err := io.Copy(file, limitedReader)

// If the HTTP PATCH request gets interrupted in the middle (e.g. because
// the user wants to pause the upload), Go's net/http returns an io.ErrUnexpectedEOF.
// However, for S3Store it's not important whether the stream has ended
// on purpose or accidentally. Therefore, we ignore this error to not
// prevent the remaining chunk to be stored on S3.
if err == io.ErrUnexpectedEOF {
err = nil
}

if err != nil {
return nil, err
}

// If the entire request body is read and no more data is available,
// io.Copy returns 0 since it is unable to read any bytes. In that
// case, we can close the s3PartProducer.
if n == 0 {
cleanUpTempFile(file)
return nil, nil
}

// Seek to the beginning of the file
file.Seek(0, 0)

return file, nil
}

func (upload s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Reader) (int64, error) {
id := upload.id
store := upload.store
Expand Down Expand Up @@ -305,8 +383,7 @@ func (upload s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Read
return 0, err
}
if incompletePartFile != nil {
defer os.Remove(incompletePartFile.Name())
defer incompletePartFile.Close()
defer cleanUpTempFile(incompletePartFile)

if err := store.deleteIncompletePartForUpload(ctx, uploadId); err != nil {
return 0, err
Expand All @@ -315,49 +392,43 @@ func (upload s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Read
src = io.MultiReader(incompletePartFile, src)
}

for {
// Create a temporary file to store the part in it
file, err := ioutil.TempFile("", "tusd-s3-tmp-")
if err != nil {
return bytesUploaded, err
}
defer os.Remove(file.Name())
defer file.Close()

limitedReader := io.LimitReader(src, optimalPartSize)
n, err := io.Copy(file, limitedReader)

// If the HTTP PATCH request gets interrupted in the middle (e.g. because
// the user wants to pause the upload), Go's net/http returns an io.ErrUnexpectedEOF.
// However, for S3Store it's not important whether the stream has ended
// on purpose or accidentally. Therefore, we ignore this error to not
// prevent the remaining chunk to be stored on S3.
if err == io.ErrUnexpectedEOF {
err = nil
fileChan := make(chan *os.File, store.MaxBufferedParts)
doneChan := make(chan struct{})
defer close(doneChan)

// If we panic or return while there are still files in the channel, then
// we may leak file descriptors. Let's ensure that those are cleaned up.
defer func() {
for file := range fileChan {
cleanUpTempFile(file)
}
}()

partProducer := s3PartProducer{
store: store,
done: doneChan,
files: fileChan,
r: src,
}
go partProducer.produce(optimalPartSize)

// io.Copy does not return io.EOF, so we not have to handle it differently.
for file := range fileChan {
stat, err := file.Stat()
if err != nil {
return bytesUploaded, err
}
// If io.Copy is finished reading, it will always return (0, nil).
if n == 0 {
return (bytesUploaded - incompletePartSize), nil
return 0, err
}

// Seek to the beginning of the file
file.Seek(0, 0)
n := stat.Size()

isFinalChunk := !info.SizeIsDeferred && (size == (offset-incompletePartSize)+n)
if n >= store.MinPartSize || isFinalChunk {
_, err = store.Service.UploadPartWithContext(ctx, &s3.UploadPartInput{
uploadPartInput := &s3.UploadPartInput{
Bucket: aws.String(store.Bucket),
Key: store.keyWithPrefix(uploadId),
UploadId: aws.String(multipartId),
PartNumber: aws.Int64(nextPartNum),
Body: file,
})
if err != nil {
}
if err := upload.putPartForUpload(ctx, uploadPartInput, file); err != nil {
return bytesUploaded, err
}
} else {
Expand All @@ -374,6 +445,20 @@ func (upload s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Read
bytesUploaded += n
nextPartNum += 1
}

return bytesUploaded - incompletePartSize, partProducer.err
}

func cleanUpTempFile(file *os.File) {
file.Close()
os.Remove(file.Name())
}

func (upload *s3Upload) putPartForUpload(ctx context.Context, uploadPartInput *s3.UploadPartInput, file *os.File) error {
defer cleanUpTempFile(file)

_, err := upload.store.Service.UploadPartWithContext(ctx, uploadPartInput)
return err
}

func (upload *s3Upload) GetInfo(ctx context.Context) (info handler.FileInfo, err error) {
Expand Down Expand Up @@ -643,13 +728,11 @@ func (upload *s3Upload) concatUsingDownload(ctx context.Context, partialUploads
uploadId, multipartId := splitIds(id)

// Create a temporary file for holding the concatenated data
file, err := ioutil.TempFile("", "tusd-s3-concat-tmp-")
file, err := ioutil.TempFile(store.TemporaryDirectory, "tusd-s3-concat-tmp-")
if err != nil {
return err
}
fmt.Println(file.Name())
Acconut marked this conversation as resolved.
Show resolved Hide resolved
defer os.Remove(file.Name())
defer file.Close()
defer cleanUpTempFile(file)

// Download each part and append it to the temporary file
for _, partialUpload := range partialUploads {
Expand Down Expand Up @@ -790,7 +873,7 @@ func (store S3Store) downloadIncompletePartForUpload(ctx context.Context, upload
}
defer incompleteUploadObject.Body.Close()

partFile, err := ioutil.TempFile("", "tusd-s3-tmp-")
partFile, err := ioutil.TempFile(store.TemporaryDirectory, "tusd-s3-tmp-")
if err != nil {
return nil, 0, err
}
Expand Down Expand Up @@ -824,11 +907,13 @@ func (store S3Store) getIncompletePartForUpload(ctx context.Context, uploadId st
return obj, err
}

func (store S3Store) putIncompletePartForUpload(ctx context.Context, uploadId string, r io.ReadSeeker) error {
func (store S3Store) putIncompletePartForUpload(ctx context.Context, uploadId string, file *os.File) error {
defer cleanUpTempFile(file)

_, err := store.Service.PutObjectWithContext(ctx, &s3.PutObjectInput{
Bucket: aws.String(store.Bucket),
Key: store.metadataKeyWithPrefix(uploadId + ".part"),
Body: r,
Body: file,
})
return err
}
Expand Down
Loading