-
Notifications
You must be signed in to change notification settings - Fork 495
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
S3Store: Concurrently write upload parts to S3 while reading from client #402
Merged
Merged
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit
Hold shift + click to select a range
87045a8
Allow empty metadata values
JenSte 6c45ff2
Make tests less fragile by allowing loose call ordering
acj 2b3fb49
Add s3ChunkProducer
acj 54819d8
Integrate s3ChunkProducer to support chunk buffering
acj b72a4d4
Remove completed chunk files inline to reduce disk space usage
acj 8d8046c
Add tests for chunk producer
acj ec5f500
docs: Use value from Host header to forward to tusd
Acconut 08a72a5
Use int64 for MaxBufferedParts field
acj c51afa1
Default to 20 buffered parts
acj ff63ab4
Rename s3ChunkProducer -> s3PartProducer
acj 46e0e9c
Document s3PartProducer struct
acj 5dd7c3b
Clarify misleading comment
acj be6cf54
Revert "Remove completed chunk files inline to reduce disk space usage"
acj 74c5c0c
Remove redundant seek
acj d014a3e
Clean up any remaining files in the channel when we return
acj 9cb1385
Make putPart* functions responsible for cleaning up temp files
acj 59c3d42
Merge branch 'metadata' of https://github.com/JenSte/tusd
Acconut 26b84bc
handler: Add tests for empty metadata pairs
Acconut a23a1af
Merge branch 's3store-buffered-chunks' of https://github.com/acj/tusd…
Acconut b79c64f
Factor out cleanUpTempFile func
acj 2cb30a4
Merge branch 's3store-buffered-chunks' of https://github.com/acj/tusd…
Acconut 6984744
Add test to ensure that temporary files get cleaned up
Acconut File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,6 +6,7 @@ import ( | |
"fmt" | ||
"io" | ||
"io/ioutil" | ||
"strings" | ||
"testing" | ||
"time" | ||
|
||
|
@@ -14,6 +15,7 @@ import ( | |
|
||
"github.com/aws/aws-sdk-go/aws" | ||
"github.com/aws/aws-sdk-go/aws/awserr" | ||
"github.com/aws/aws-sdk-go/aws/request" | ||
"github.com/aws/aws-sdk-go/service/s3" | ||
"github.com/tus/tusd/pkg/handler" | ||
) | ||
|
@@ -1317,3 +1319,99 @@ func TestConcatUploadsUsingDownload(t *testing.T) { | |
// Wait a short delay until the call to AbortMultipartUploadWithContext also occurs. | ||
<-time.After(10 * time.Millisecond) | ||
} | ||
|
||
type s3APIWithTempFileAssertion struct { | ||
*MockS3API | ||
assert *assert.Assertions | ||
tempDir string | ||
} | ||
|
||
func (s s3APIWithTempFileAssertion) UploadPartWithContext(context.Context, *s3.UploadPartInput, ...request.Option) (*s3.UploadPartOutput, error) { | ||
assert := s.assert | ||
|
||
// Make sure that only the two temporary files from tusd are in here. | ||
files, err := ioutil.ReadDir(s.tempDir) | ||
assert.Nil(err) | ||
for _, file := range files { | ||
assert.True(strings.HasPrefix(file.Name(), "tusd-s3-tmp-")) | ||
} | ||
assert.Equal(len(files), 2) | ||
|
||
return nil, fmt.Errorf("not now") | ||
} | ||
|
||
// This test ensures that the S3Store will cleanup all files that it creates during | ||
// a call to WriteChunk, even if an error occurs during that invocation. | ||
// Here, we provide 14 bytes to WriteChunk and since the PartSize is set to 10, | ||
// it will split the input into two parts (10 bytes and 4 bytes). | ||
// Inside the first call to UploadPartWithContext, we assert that the temporary files | ||
// for both parts have been created and we return an error. | ||
// In the end, we assert that the error bubbled up and that all temporary files have | ||
// been cleaned up. | ||
func TestWriteChunkCleansUpTempFiles(t *testing.T) { | ||
mockCtrl := gomock.NewController(t) | ||
defer mockCtrl.Finish() | ||
assert := assert.New(t) | ||
|
||
// Create a temporary directory, so no files get mixed in. | ||
tempDir, err := ioutil.TempDir("", "tusd-s3-cleanup-tests-") | ||
assert.Nil(err) | ||
|
||
s3obj := NewMockS3API(mockCtrl) | ||
s3api := s3APIWithTempFileAssertion{ | ||
MockS3API: s3obj, | ||
assert: assert, | ||
tempDir: tempDir, | ||
} | ||
store := New("bucket", s3api) | ||
store.MaxPartSize = 10 | ||
store.MinPartSize = 10 | ||
store.MaxMultipartParts = 10000 | ||
store.MaxObjectSize = 5 * 1024 * 1024 * 1024 * 1024 | ||
store.TemporaryDirectory = tempDir | ||
|
||
// The usual S3 calls for retrieving the upload | ||
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ | ||
Bucket: aws.String("bucket"), | ||
Key: aws.String("uploadId.info"), | ||
}).Return(&s3.GetObjectOutput{ | ||
Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":null}`))), | ||
}, nil) | ||
s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{ | ||
Bucket: aws.String("bucket"), | ||
Key: aws.String("uploadId"), | ||
UploadId: aws.String("multipartId"), | ||
PartNumberMarker: aws.Int64(0), | ||
}).Return(&s3.ListPartsOutput{ | ||
Parts: []*s3.Part{ | ||
{ | ||
Size: aws.Int64(100), | ||
}, | ||
{ | ||
Size: aws.Int64(200), | ||
}, | ||
}, | ||
}, nil).Times(2) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
👍 It's out of scope here, but I think there are a number of places where we could combine duplicate |
||
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ | ||
Bucket: aws.String("bucket"), | ||
Key: aws.String("uploadId.part"), | ||
}).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "Not found", nil)) | ||
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ | ||
Bucket: aws.String("bucket"), | ||
Key: aws.String("uploadId.part"), | ||
}).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "The specified key does not exist.", nil)) | ||
|
||
// No calls to s3obj.EXPECT().UploadPartWithContext since that is handled by s3APIWithTempFileAssertion | ||
|
||
upload, err := store.GetUpload(context.Background(), "uploadId+multipartId") | ||
assert.Nil(err) | ||
|
||
bytesRead, err := upload.WriteChunk(context.Background(), 300, bytes.NewReader([]byte("1234567890ABCD"))) | ||
assert.NotNil(err) | ||
assert.Equal(err.Error(), "not now") | ||
assert.Equal(int64(0), bytesRead) | ||
|
||
files, err := ioutil.ReadDir(tempDir) | ||
assert.Nil(err) | ||
assert.Equal(len(files), 0) | ||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching these 👍