Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3Store: Concurrently write upload parts to S3 while reading from client #402

Merged
merged 22 commits into from
Jul 29, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
87045a8
Allow empty metadata values
JenSte Jun 20, 2020
6c45ff2
Make tests less fragile by allowing loose call ordering
acj Feb 28, 2020
2b3fb49
Add s3ChunkProducer
acj Mar 16, 2020
54819d8
Integrate s3ChunkProducer to support chunk buffering
acj Mar 16, 2020
b72a4d4
Remove completed chunk files inline to reduce disk space usage
acj Mar 16, 2020
8d8046c
Add tests for chunk producer
acj Mar 20, 2020
ec5f500
docs: Use value from Host header to forward to tusd
Acconut Jun 24, 2020
08a72a5
Use int64 for MaxBufferedParts field
acj Jul 14, 2020
c51afa1
Default to 20 buffered parts
acj Jul 14, 2020
ff63ab4
Rename s3ChunkProducer -> s3PartProducer
acj Jul 14, 2020
46e0e9c
Document s3PartProducer struct
acj Jul 14, 2020
5dd7c3b
Clarify misleading comment
acj Jul 14, 2020
be6cf54
Revert "Remove completed chunk files inline to reduce disk space usage"
acj Jul 14, 2020
74c5c0c
Remove redundant seek
acj Jul 14, 2020
d014a3e
Clean up any remaining files in the channel when we return
acj Jul 14, 2020
9cb1385
Make putPart* functions responsible for cleaning up temp files
acj Jul 14, 2020
59c3d42
Merge branch 'metadata' of https://github.com/JenSte/tusd
Acconut Jul 15, 2020
26b84bc
handler: Add tests for empty metadata pairs
Acconut Jul 15, 2020
a23a1af
Merge branch 's3store-buffered-chunks' of https://github.com/acj/tusd…
Acconut Jul 18, 2020
b79c64f
Factor out cleanUpTempFile func
acj Jul 20, 2020
2cb30a4
Merge branch 's3store-buffered-chunks' of https://github.com/acj/tusd…
Acconut Jul 22, 2020
6984744
Add test to ensure that temporary files get cleaned up
Acconut Jul 22, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 19 additions & 14 deletions pkg/s3store/s3store.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ type S3Store struct {
// can help improve throughput by not blocking the client while tusd is
// communicating with the S3 API, which can have unpredictable latency.
MaxBufferedParts int64
// TemporaryDirectory is the path where S3Store will create temporary files
// on disk during the upload. An empty string ("", the default value) will
// cause S3Store to use the operating system's default temporary directory.
TemporaryDirectory string
}

type S3API interface {
Expand All @@ -158,13 +162,14 @@ type S3API interface {
// New constructs a new storage using the supplied bucket and service object.
func New(bucket string, service S3API) S3Store {
return S3Store{
Bucket: bucket,
Service: service,
MaxPartSize: 5 * 1024 * 1024 * 1024,
MinPartSize: 5 * 1024 * 1024,
MaxMultipartParts: 10000,
MaxObjectSize: 5 * 1024 * 1024 * 1024 * 1024,
MaxBufferedParts: 20,
Bucket: bucket,
Service: service,
MaxPartSize: 5 * 1024 * 1024 * 1024,
MinPartSize: 5 * 1024 * 1024,
MaxMultipartParts: 10000,
MaxObjectSize: 5 * 1024 * 1024 * 1024 * 1024,
MaxBufferedParts: 20,
TemporaryDirectory: "",
}
}

Expand Down Expand Up @@ -280,6 +285,7 @@ func (upload *s3Upload) writeInfo(ctx context.Context, info handler.FileInfo) er

// s3PartProducer converts a stream of bytes from the reader into a stream of files on disk
type s3PartProducer struct {
store *S3Store
files chan<- *os.File
done chan struct{}
err error
Expand Down Expand Up @@ -309,7 +315,7 @@ func (spp *s3PartProducer) produce(partSize int64) {

func (spp *s3PartProducer) nextPart(size int64) (*os.File, error) {
// Create a temporary file to store the part
file, err := ioutil.TempFile("", "tusd-s3-tmp-")
file, err := ioutil.TempFile(spp.store.TemporaryDirectory, "tusd-s3-tmp-")
if err != nil {
return nil, err
}
Expand All @@ -334,8 +340,7 @@ func (spp *s3PartProducer) nextPart(size int64) (*os.File, error) {
// io.Copy returns 0 since it is unable to read any bytes. In that
// case, we can close the s3PartProducer.
if n == 0 {
os.Remove(file.Name())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching these 👍

file.Close()
cleanUpTempFile(file)
return nil, nil
}

Expand Down Expand Up @@ -378,8 +383,7 @@ func (upload s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Read
return 0, err
}
if incompletePartFile != nil {
defer os.Remove(incompletePartFile.Name())
defer incompletePartFile.Close()
defer cleanUpTempFile(incompletePartFile)

if err := store.deleteIncompletePartForUpload(ctx, uploadId); err != nil {
return 0, err
Expand All @@ -401,6 +405,7 @@ func (upload s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Read
}()

partProducer := s3PartProducer{
store: store,
done: doneChan,
files: fileChan,
r: src,
Expand Down Expand Up @@ -723,7 +728,7 @@ func (upload *s3Upload) concatUsingDownload(ctx context.Context, partialUploads
uploadId, multipartId := splitIds(id)

// Create a temporary file for holding the concatenated data
file, err := ioutil.TempFile("", "tusd-s3-concat-tmp-")
file, err := ioutil.TempFile(store.TemporaryDirectory, "tusd-s3-concat-tmp-")
if err != nil {
return err
}
Expand Down Expand Up @@ -868,7 +873,7 @@ func (store S3Store) downloadIncompletePartForUpload(ctx context.Context, upload
}
defer incompleteUploadObject.Body.Close()

partFile, err := ioutil.TempFile("", "tusd-s3-tmp-")
partFile, err := ioutil.TempFile(store.TemporaryDirectory, "tusd-s3-tmp-")
if err != nil {
return nil, 0, err
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/s3store/s3store_part_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func TestPartProducerConsumesEntireReaderWithoutError(t *testing.T) {
expectedStr := "test"
r := strings.NewReader(expectedStr)
pp := s3PartProducer{
store: &S3Store{},
done: doneChan,
files: fileChan,
r: r,
Expand Down Expand Up @@ -62,6 +63,7 @@ func TestPartProducerExitsWhenDoneChannelIsClosed(t *testing.T) {
fileChan := make(chan *os.File)
doneChan := make(chan struct{})
pp := s3PartProducer{
store: &S3Store{},
done: doneChan,
files: fileChan,
r: InfiniteZeroReader{},
Expand Down Expand Up @@ -89,6 +91,7 @@ func TestPartProducerExitsWhenDoneChannelIsClosedBeforeAnyPartIsSent(t *testing.
fileChan := make(chan *os.File)
doneChan := make(chan struct{})
pp := s3PartProducer{
store: &S3Store{},
done: doneChan,
files: fileChan,
r: InfiniteZeroReader{},
Expand Down Expand Up @@ -116,6 +119,7 @@ func TestPartProducerExitsWhenUnableToReadFromFile(t *testing.T) {
fileChan := make(chan *os.File)
doneChan := make(chan struct{})
pp := s3PartProducer{
store: &S3Store{},
done: doneChan,
files: fileChan,
r: ErrorReader{},
Expand Down Expand Up @@ -152,4 +156,4 @@ func safelyDrainChannelOrFail(c chan *os.File, t *testing.T) {
}

t.Fatal("timed out waiting for channel to drain")
}
}
98 changes: 98 additions & 0 deletions pkg/s3store/s3store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"io/ioutil"
"strings"
"testing"
"time"

Expand All @@ -14,6 +15,7 @@ import (

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/tus/tusd/pkg/handler"
)
Expand Down Expand Up @@ -1317,3 +1319,99 @@ func TestConcatUploadsUsingDownload(t *testing.T) {
// Wait a short delay until the call to AbortMultipartUploadWithContext also occurs.
<-time.After(10 * time.Millisecond)
}

type s3APIWithTempFileAssertion struct {
*MockS3API
assert *assert.Assertions
tempDir string
}

func (s s3APIWithTempFileAssertion) UploadPartWithContext(context.Context, *s3.UploadPartInput, ...request.Option) (*s3.UploadPartOutput, error) {
assert := s.assert

// Make sure that only the two temporary files from tusd are in here.
files, err := ioutil.ReadDir(s.tempDir)
assert.Nil(err)
for _, file := range files {
assert.True(strings.HasPrefix(file.Name(), "tusd-s3-tmp-"))
}
assert.Equal(len(files), 2)

return nil, fmt.Errorf("not now")
}

// This test ensures that the S3Store will cleanup all files that it creates during
// a call to WriteChunk, even if an error occurs during that invocation.
// Here, we provide 14 bytes to WriteChunk and since the PartSize is set to 10,
// it will split the input into two parts (10 bytes and 4 bytes).
// Inside the first call to UploadPartWithContext, we assert that the temporary files
// for both parts have been created and we return an error.
// In the end, we assert that the error bubbled up and that all temporary files have
// been cleaned up.
func TestWriteChunkCleansUpTempFiles(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
assert := assert.New(t)

// Create a temporary directory, so no files get mixed in.
tempDir, err := ioutil.TempDir("", "tusd-s3-cleanup-tests-")
assert.Nil(err)

s3obj := NewMockS3API(mockCtrl)
s3api := s3APIWithTempFileAssertion{
MockS3API: s3obj,
assert: assert,
tempDir: tempDir,
}
store := New("bucket", s3api)
store.MaxPartSize = 10
store.MinPartSize = 10
store.MaxMultipartParts = 10000
store.MaxObjectSize = 5 * 1024 * 1024 * 1024 * 1024
store.TemporaryDirectory = tempDir

// The usual S3 calls for retrieving the upload
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.info"),
}).Return(&s3.GetObjectOutput{
Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":null}`))),
}, nil)
s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"),
PartNumberMarker: aws.Int64(0),
}).Return(&s3.ListPartsOutput{
Parts: []*s3.Part{
{
Size: aws.Int64(100),
},
{
Size: aws.Int64(200),
},
},
}, nil).Times(2)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.Times(2)

👍

It's out of scope here, but I think there are a number of places where we could combine duplicate EXPECT() calls now that the call ordering is less strict.

s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.part"),
}).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "Not found", nil))
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.part"),
}).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "The specified key does not exist.", nil))

// No calls to s3obj.EXPECT().UploadPartWithContext since that is handled by s3APIWithTempFileAssertion

upload, err := store.GetUpload(context.Background(), "uploadId+multipartId")
assert.Nil(err)

bytesRead, err := upload.WriteChunk(context.Background(), 300, bytes.NewReader([]byte("1234567890ABCD")))
assert.NotNil(err)
assert.Equal(err.Error(), "not now")
assert.Equal(int64(0), bytesRead)

files, err := ioutil.ReadDir(tempDir)
assert.Nil(err)
assert.Equal(len(files), 0)
}