Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3Store: Concurrently write upload parts to S3 while reading from client #402

Merged
merged 22 commits into from
Jul 29, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
87045a8
Allow empty metadata values
JenSte Jun 20, 2020
6c45ff2
Make tests less fragile by allowing loose call ordering
acj Feb 28, 2020
2b3fb49
Add s3ChunkProducer
acj Mar 16, 2020
54819d8
Integrate s3ChunkProducer to support chunk buffering
acj Mar 16, 2020
b72a4d4
Remove completed chunk files inline to reduce disk space usage
acj Mar 16, 2020
8d8046c
Add tests for chunk producer
acj Mar 20, 2020
ec5f500
docs: Use value from Host header to forward to tusd
Acconut Jun 24, 2020
08a72a5
Use int64 for MaxBufferedParts field
acj Jul 14, 2020
c51afa1
Default to 20 buffered parts
acj Jul 14, 2020
ff63ab4
Rename s3ChunkProducer -> s3PartProducer
acj Jul 14, 2020
46e0e9c
Document s3PartProducer struct
acj Jul 14, 2020
5dd7c3b
Clarify misleading comment
acj Jul 14, 2020
be6cf54
Revert "Remove completed chunk files inline to reduce disk space usage"
acj Jul 14, 2020
74c5c0c
Remove redundant seek
acj Jul 14, 2020
d014a3e
Clean up any remaining files in the channel when we return
acj Jul 14, 2020
9cb1385
Make putPart* functions responsible for cleaning up temp files
acj Jul 14, 2020
59c3d42
Merge branch 'metadata' of https://github.com/JenSte/tusd
Acconut Jul 15, 2020
26b84bc
handler: Add tests for empty metadata pairs
Acconut Jul 15, 2020
a23a1af
Merge branch 's3store-buffered-chunks' of https://github.com/acj/tusd…
Acconut Jul 18, 2020
b79c64f
Factor out cleanUpTempFile func
acj Jul 20, 2020
2cb30a4
Merge branch 's3store-buffered-chunks' of https://github.com/acj/tusd…
Acconut Jul 22, 2020
6984744
Add test to ensure that temporary files get cleaned up
Acconut Jul 22, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 92 additions & 26 deletions pkg/s3store/s3store.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ type S3Store struct {
// MaxObjectSize is the maximum size an S3 Object can have according to S3
// API specifications. See link above.
MaxObjectSize int64
// MaxBufferedParts is the number of additional parts that can be received from
// the client and stored on disk while a part is being uploaded to S3. This
// can help improve throughput by not blocking the client while tusd is
// communicating with the S3 API, which can have unpredictable latency.
MaxBufferedParts int
acj marked this conversation as resolved.
Show resolved Hide resolved
}

type S3API interface {
Expand Down Expand Up @@ -272,6 +277,70 @@ func (upload *s3Upload) writeInfo(ctx context.Context, info handler.FileInfo) er
return err
}

type s3ChunkProducer struct {
acj marked this conversation as resolved.
Show resolved Hide resolved
files chan<- *os.File
done chan struct{}
err error
r io.Reader
}

func (scp *s3ChunkProducer) produce(chunkSize int64) {
for {
file, err := scp.nextChunk(chunkSize)
if err != nil {
scp.err = err
close(scp.files)
return
}
if file == nil {
close(scp.files)
return
}
select {
case scp.files <- file:
case <-scp.done:
close(scp.files)
return
}
}
}

func (scp *s3ChunkProducer) nextChunk(size int64) (*os.File, error) {
// Create a temporary file to store the part
file, err := ioutil.TempFile("", "tusd-s3-tmp-")
if err != nil {
return nil, err
}

limitedReader := io.LimitReader(scp.r, size)
n, err := io.Copy(file, limitedReader)

// If the HTTP PATCH request gets interrupted in the middle (e.g. because
// the user wants to pause the upload), Go's net/http returns an io.ErrUnexpectedEOF.
// However, for S3Store it's not important whether the stream has ended
// on purpose or accidentally. Therefore, we ignore this error to not
// prevent the remaining chunk to be stored on S3.
if err == io.ErrUnexpectedEOF {
err = nil
}

if err != nil {
return nil, err
}

// io.Copy returns (0, nil) when it reaches EOF
acj marked this conversation as resolved.
Show resolved Hide resolved
if n == 0 {
os.Remove(file.Name())
file.Close()
return nil, nil
}

// Seek to the beginning of the file
file.Seek(0, 0)

return file, nil
}

func (upload s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Reader) (int64, error) {
id := upload.id
store := upload.store
Expand Down Expand Up @@ -315,35 +384,23 @@ func (upload s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Read
src = io.MultiReader(incompletePartFile, src)
}

for {
// Create a temporary file to store the part in it
file, err := ioutil.TempFile("", "tusd-s3-tmp-")
if err != nil {
return bytesUploaded, err
}
defer os.Remove(file.Name())
defer file.Close()

limitedReader := io.LimitReader(src, optimalPartSize)
n, err := io.Copy(file, limitedReader)

// If the HTTP PATCH request gets interrupted in the middle (e.g. because
// the user wants to pause the upload), Go's net/http returns an io.ErrUnexpectedEOF.
// However, for S3Store it's not important whether the stream has ended
// on purpose or accidentally. Therefore, we ignore this error to not
// prevent the remaining chunk to be stored on S3.
if err == io.ErrUnexpectedEOF {
err = nil
}
fileChan := make(chan *os.File, store.MaxBufferedParts)
doneChan := make(chan struct{})
defer close(doneChan)

// io.Copy does not return io.EOF, so we not have to handle it differently.
chunkProducer := s3ChunkProducer{
done: doneChan,
files: fileChan,
r: src,
}
go chunkProducer.produce(optimalPartSize)

for file := range fileChan {
stat, err := file.Stat()
if err != nil {
return bytesUploaded, err
}
// If io.Copy is finished reading, it will always return (0, nil).
if n == 0 {
return (bytesUploaded - incompletePartSize), nil
return 0, err
}
n := stat.Size()

// Seek to the beginning of the file
file.Seek(0, 0)
Expand Down Expand Up @@ -373,7 +430,16 @@ func (upload s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Read
offset += n
bytesUploaded += n
nextPartNum += 1

if err := os.Remove(file.Name()); err != nil {
acj marked this conversation as resolved.
Show resolved Hide resolved
return bytesUploaded, err
}
if err := file.Close(); err != nil {
return bytesUploaded, err
}
}

return bytesUploaded - incompletePartSize, chunkProducer.err
}

func (upload *s3Upload) GetInfo(ctx context.Context) (info handler.FileInfo, err error) {
Expand Down
155 changes: 155 additions & 0 deletions pkg/s3store/s3store_chunk_producer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package s3store

import (
"errors"
"os"
"strings"
"testing"
"time"
)

type InfiniteZeroReader struct{}

func (izr InfiniteZeroReader) Read(b []byte) (int, error) {
b[0] = 0
return 1, nil
}

type ErrorReader struct{}

func (ErrorReader) Read(b []byte) (int, error) {
return 0, errors.New("error from ErrorReader")
}

func TestChunkProducerConsumesEntireReaderWithoutError(t *testing.T) {
fileChan := make(chan *os.File)
doneChan := make(chan struct{})
expectedStr := "test"
r := strings.NewReader(expectedStr)
cp := s3ChunkProducer{
done: doneChan,
files: fileChan,
r: r,
}
go cp.produce(1)

actualStr := ""
b := make([]byte, 1)
for f := range fileChan {
n, err := f.Read(b)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if n != 1 {
t.Fatalf("incorrect number of bytes read: wanted %d, got %d", 1, n)
}
actualStr += string(b)

os.Remove(f.Name())
f.Close()
}

if actualStr != expectedStr {
t.Errorf("incorrect string read from channel: wanted %s, got %s", expectedStr, actualStr)
}

if cp.err != nil {
t.Errorf("unexpected error from chunk producer: %s", cp.err)
}
}

func TestChunkProducerExitsWhenDoneChannelIsClosed(t *testing.T) {
fileChan := make(chan *os.File)
doneChan := make(chan struct{})
cp := s3ChunkProducer{
done: doneChan,
files: fileChan,
r: InfiniteZeroReader{},
}

completedChan := make(chan struct{})
go func() {
cp.produce(10)
completedChan <- struct{}{}
}()

close(doneChan)

select {
case <-completedChan:
// producer exited cleanly
case <-time.After(2 * time.Second):
t.Error("timed out waiting for producer to exit")
}

safelyDrainChannelOrFail(fileChan, t)
}

func TestChunkProducerExitsWhenDoneChannelIsClosedBeforeAnyChunkIsSent(t *testing.T) {
fileChan := make(chan *os.File)
doneChan := make(chan struct{})
cp := s3ChunkProducer{
done: doneChan,
files: fileChan,
r: InfiniteZeroReader{},
}

close(doneChan)

completedChan := make(chan struct{})
go func() {
cp.produce(10)
completedChan <- struct{}{}
}()

select {
case <-completedChan:
// producer exited cleanly
case <-time.After(2 * time.Second):
t.Error("timed out waiting for producer to exit")
}

safelyDrainChannelOrFail(fileChan, t)
}

func TestChunkProducerExitsWhenUnableToReadFromFile(t *testing.T) {
fileChan := make(chan *os.File)
doneChan := make(chan struct{})
cp := s3ChunkProducer{
done: doneChan,
files: fileChan,
r: ErrorReader{},
}

completedChan := make(chan struct{})
go func() {
cp.produce(10)
completedChan <- struct{}{}
}()

select {
case <-completedChan:
// producer exited cleanly
case <-time.After(2 * time.Second):
t.Error("timed out waiting for producer to exit")
}

safelyDrainChannelOrFail(fileChan, t)

if cp.err == nil {
t.Error("expected an error but didn't get one")
}
}

func safelyDrainChannelOrFail(c chan *os.File, t *testing.T) {
acj marked this conversation as resolved.
Show resolved Hide resolved
// At this point, we've signaled that the producer should exit, but it may write a few files
// into the channel before closing it and exiting. Make sure that we get a nil value
// eventually.
for i := 0; i < 100; i++ {
if f := <-c; f == nil {
return
}
}

t.Fatal("timed out waiting for channel to drain")
}
Loading