diff --git a/pkg/s3store/s3store.go b/pkg/s3store/s3store.go index cb876880..35967aaf 100644 --- a/pkg/s3store/s3store.go +++ b/pkg/s3store/s3store.go @@ -92,6 +92,7 @@ import ( "golang.org/x/sync/errgroup" "github.com/aws/aws-sdk-go-v2/aws" + awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3/types" "github.com/aws/smithy-go" @@ -263,6 +264,7 @@ func (store S3Store) UseIn(composer *handler.StoreComposer) { composer.UseTerminater(store) composer.UseConcater(store) composer.UseLengthDeferrer(store) + composer.UseContentServer(store) } func (store S3Store) RegisterMetrics(registry prometheus.Registerer) { @@ -385,51 +387,6 @@ func (store S3Store) AsServableUpload(upload handler.Upload) handler.ServableUpl return upload.(*s3Upload) } -func (su *s3Upload) ServeContent(ctx context.Context, w http.ResponseWriter, r *http.Request) error { - // Get file info - info, err := su.GetInfo(ctx) - if err != nil { - return err - } - - // Prepare GetObject input - input := &s3.GetObjectInput{ - Bucket: aws.String(su.store.Bucket), - Key: su.store.keyWithPrefix(su.objectId), - } - - // Forward the Range header if present - if rangeHeader := r.Header.Get("Range"); rangeHeader != "" { - input.Range = aws.String(rangeHeader) - } - - // Let S3 handle the request - result, err := su.store.Service.GetObject(ctx, input) - if err != nil { - return err - } - defer result.Body.Close() - - // Set headers - w.Header().Set("Content-Length", strconv.FormatInt(info.Size, 10)) - w.Header().Set("Content-Type", info.MetaData["filetype"]) - w.Header().Set("ETag", *result.ETag) - - // Add Content-Disposition if present in S3 response - if result.ContentDisposition != nil { - w.Header().Set("Content-Disposition", *result.ContentDisposition) - } - - // Add Content-Encoding if present in S3 response - if result.ContentEncoding != nil { - w.Header().Set("Content-Encoding", *result.ContentEncoding) - } - - // Stream the content - _, err = io.Copy(w, result.Body) - return err -} - func (upload *s3Upload) writeInfo(ctx context.Context, info handler.FileInfo) error { store := upload.store @@ -1080,6 +1037,132 @@ func (upload *s3Upload) DeclareLength(ctx context.Context, length int64) error { return upload.writeInfo(ctx, info) } +// TODO: Move this into its own file. +func (su *s3Upload) ServeContent(ctx context.Context, w http.ResponseWriter, r *http.Request) error { + // TODO: If the upload is not yet finished, we don't even have to try to get the object. + + input := &s3.GetObjectInput{ + Bucket: aws.String(su.store.Bucket), + Key: su.store.keyWithPrefix(su.objectId), + } + + // Forward the Range, If-Match, If-None-Match, If-Unmodified-Since, If-Modified-Since headers if present + if val := r.Header.Get("Range"); val != "" { + input.Range = aws.String(val) + } + if val := r.Header.Get("If-Match"); val != "" { + input.IfMatch = aws.String(val) + } + if val := r.Header.Get("If-None-Match"); val != "" { + input.IfNoneMatch = aws.String(val) + } + if val := r.Header.Get("If-Modified-Since"); val != "" { + t, err := http.ParseTime(val) + if err == nil { + input.IfModifiedSince = aws.Time(t) + } + } + if val := r.Header.Get("If-Unmodified-Since"); val != "" { + t, err := http.ParseTime(val) + if err == nil { + input.IfUnmodifiedSince = aws.Time(t) + } + } + + // Let S3 handle the request + result, err := su.store.Service.GetObject(ctx, input) + if err != nil { + // Delete the headers set by tusd's handler. We don't need them for errors. + w.Header().Del("Content-Type") + w.Header().Del("Content-Disposition") + + var respErr *awshttp.ResponseError + if errors.As(err, &respErr) { + if respErr.HTTPStatusCode() == http.StatusNotFound || respErr.HTTPStatusCode() == http.StatusForbidden { + return handler.ErrNotFound + } + + if respErr.HTTPStatusCode() == http.StatusNotModified { + // Content-Location, Date, ETag, Vary, Cache-Control and Expires should be set + // for 304 Not Modified responses. See https://httpwg.org/specs/rfc9110.html#status.304 + for _, header := range []string{"Content-Location", "Date", "ETag", "Vary", "Cache-Control", "Expires"} { + fmt.Println("header", respErr.Response.Header) + fmt.Println("header", header) + if val := respErr.Response.Header.Get(header); val != "" { + fmt.Println("header", header, val) + w.Header().Set(header, val) + } + } + + // TODO: Return HTTPErrors, so that tusd can log them. + w.WriteHeader(http.StatusNotModified) + return nil + } + + if respErr.HTTPStatusCode() == http.StatusRequestedRangeNotSatisfiable { + // Content-Range should be set for 416 Request Range Not Satisfiable responses. + // See https://httpwg.org/specs/rfc9110.html#status.304 + // Note: AWS S3 does not seem to include this header in its response. + if val := respErr.Response.Header.Get("Content-Range"); val != "" { + w.Header().Set("Content-Range", val) + } + + w.WriteHeader(http.StatusRequestedRangeNotSatisfiable) + return nil + } + } + return err + } + defer result.Body.Close() + + // Add Accept-Ranges,Content-*, Cache-Control, ETag, Expires, Last-Modified headers if present in S3 response + if result.AcceptRanges != nil { + w.Header().Set("Accept-Ranges", *result.AcceptRanges) + } + if result.ContentDisposition != nil { + w.Header().Set("Content-Disposition", *result.ContentDisposition) + } + if result.ContentEncoding != nil { + w.Header().Set("Content-Encoding", *result.ContentEncoding) + } + if result.ContentLanguage != nil { + w.Header().Set("Content-Language", *result.ContentLanguage) + } + if result.ContentLength != nil { + w.Header().Set("Content-Length", strconv.FormatInt(*result.ContentLength, 10)) + } + if result.ContentRange != nil { + w.Header().Set("Content-Range", *result.ContentRange) + } + if result.ContentType != nil { + w.Header().Set("Content-Type", *result.ContentType) + } + if result.CacheControl != nil { + w.Header().Set("Cache-Control", *result.CacheControl) + } + if result.ETag != nil { + w.Header().Set("ETag", *result.ETag) + } + if result.ExpiresString != nil { + w.Header().Set("Expires", *result.ExpiresString) + } + if result.LastModified != nil { + w.Header().Set("Last-Modified", result.LastModified.Format(http.TimeFormat)) + } + + statusCode := http.StatusOK + if result.ContentRange != nil { + // Use 206 Partial Content for range requests + statusCode = http.StatusPartialContent + } else if result.ContentLength != nil && *result.ContentLength == 0 { + statusCode = http.StatusNoContent + } + w.WriteHeader(statusCode) + + _, err = io.Copy(w, result.Body) + return err +} + func (store S3Store) listAllParts(ctx context.Context, objectId string, multipartId string) (parts []*s3Part, err error) { var partMarker *string for { diff --git a/pkg/s3store/s3store_test.go b/pkg/s3store/s3store_test.go index 70461a4d..17edc036 100644 --- a/pkg/s3store/s3store_test.go +++ b/pkg/s3store/s3store_test.go @@ -17,9 +17,11 @@ import ( "github.com/stretchr/testify/assert" "github.com/aws/aws-sdk-go-v2/aws" + awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3/types" "github.com/aws/smithy-go" + smithyhttp "github.com/aws/smithy-go/transport/http" "github.com/tus/tusd/v2/pkg/handler" ) @@ -1510,13 +1512,17 @@ func TestS3ServableUploadServeContent(t *testing.T) { multipartId: "multipartId", } + // TODO: Should we initialize the upload with GetUpload? + s3obj.EXPECT().GetObject(gomock.Any(), &s3.GetObjectInput{ Bucket: aws.String("bucket"), Key: aws.String("uploadId"), }).Return(&s3.GetObjectOutput{ Body: io.NopCloser(strings.NewReader("test content")), ContentLength: aws.Int64(100), + ContentType: aws.String("text/plain"), ETag: aws.String("etag123"), + CacheControl: aws.String("max-age=3600"), }, nil) servableUpload := store.AsServableUpload(upload) @@ -1531,6 +1537,7 @@ func TestS3ServableUploadServeContent(t *testing.T) { assert.Equal("100", w.Header().Get("Content-Length")) assert.Equal("text/plain", w.Header().Get("Content-Type")) assert.Equal("etag123", w.Header().Get("ETag")) + assert.Equal("max-age=3600", w.Header().Get("Cache-Control")) assert.Equal("test content", w.Body.String()) } @@ -1556,6 +1563,8 @@ func TestS3ServableUploadServeContentWithRange(t *testing.T) { }).Return(&s3.GetObjectOutput{ Body: io.NopCloser(strings.NewReader("0123456789")), ContentLength: aws.Int64(10), + ContentRange: aws.String("bytes 10-19/100"), + ContentType: aws.String("text/plain"), ETag: aws.String("etag123"), }, nil) @@ -1576,7 +1585,7 @@ func TestS3ServableUploadServeContentWithRange(t *testing.T) { assert.Equal("0123456789", w.Body.String()) } -func TestS3ServableUploadServeContentError(t *testing.T) { +func TestS3ServableUploadServeContentInternalError(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() assert := assert.New(t) @@ -1592,7 +1601,10 @@ func TestS3ServableUploadServeContentError(t *testing.T) { } expectedError := errors.New("S3 error") - s3obj.EXPECT().GetObject(gomock.Any(), gomock.Any()).Return(nil, expectedError) + s3obj.EXPECT().GetObject(gomock.Any(), &s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + }).Return(nil, expectedError) servableUpload := store.AsServableUpload(upload) @@ -1602,3 +1614,134 @@ func TestS3ServableUploadServeContentError(t *testing.T) { err := servableUpload.ServeContent(context.Background(), w, r) assert.Equal(expectedError, err) } + +func TestS3ServableUploadServeContentNotFound(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + assert := assert.New(t) + + s3obj := NewMockS3API(mockCtrl) + store := New("bucket", s3obj) + + upload := &s3Upload{ + store: &store, + info: &handler.FileInfo{Size: 100, Offset: 100, MetaData: map[string]string{"filetype": "text/plain"}}, + objectId: "uploadId", + multipartId: "multipartId", + } + + s3obj.EXPECT().GetObject(gomock.Any(), &s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + }).Return(nil, &awshttp.ResponseError{ + ResponseError: &smithyhttp.ResponseError{ + Response: &smithyhttp.Response{ + Response: &http.Response{ + StatusCode: http.StatusNotFound, + }, + }, + }, + }) + + servableUpload := store.AsServableUpload(upload) + + w := httptest.NewRecorder() + r := httptest.NewRequest("GET", "/", nil) + + err := servableUpload.ServeContent(context.Background(), w, r) + assert.Equal(handler.ErrNotFound, err) +} + +func TestS3ServableUploadServeContentRangeNotSatisfiable(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + assert := assert.New(t) + + s3obj := NewMockS3API(mockCtrl) + store := New("bucket", s3obj) + + upload := &s3Upload{ + store: &store, + info: &handler.FileInfo{Size: 100, Offset: 100, MetaData: map[string]string{"filetype": "text/plain"}}, + objectId: "uploadId", + multipartId: "multipartId", + } + + r := httptest.NewRequest("GET", "/", nil) + r.Header.Set("Range", "bytes=200-300") + + s3obj.EXPECT().GetObject(gomock.Any(), &s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + Range: aws.String("bytes=200-300"), + }).Return(nil, &awshttp.ResponseError{ + ResponseError: &smithyhttp.ResponseError{ + Response: &smithyhttp.Response{ + Response: &http.Response{ + StatusCode: http.StatusRequestedRangeNotSatisfiable, + Header: http.Header{ + "Content-Range": []string{"bytes */100"}, + }, + }, + }, + }, + }) + + servableUpload := store.AsServableUpload(upload) + w := httptest.NewRecorder() + + err := servableUpload.ServeContent(context.Background(), w, r) + assert.NoError(err) + assert.Equal(http.StatusRequestedRangeNotSatisfiable, w.Code) + assert.Equal("bytes */100", w.Header().Get("Content-Range")) +} + +func TestS3ServableUploadServeContentNotModified(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + assert := assert.New(t) + + s3obj := NewMockS3API(mockCtrl) + store := New("bucket", s3obj) + + upload := &s3Upload{ + store: &store, + info: &handler.FileInfo{Size: 100, Offset: 100, MetaData: map[string]string{"filetype": "text/plain"}}, + objectId: "uploadId", + multipartId: "multipartId", + } + + r := httptest.NewRequest("GET", "/", nil) + r.Header.Set("If-None-Match", `"some-etag"`) + + s3obj.EXPECT().GetObject(gomock.Any(), &s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + IfNoneMatch: aws.String(`"some-etag"`), + }).Return(nil, &awshttp.ResponseError{ + ResponseError: &smithyhttp.ResponseError{ + Response: &smithyhttp.Response{ + Response: &http.Response{ + StatusCode: http.StatusNotModified, + Header: http.Header{ + // We intentionally set Etag instead of ETag because Go's + // textproto.CanonicalMIMEHeaderKey normalizes it that way. + "Etag": []string{`"some-other-etag"`}, + "Cache-Control": []string{"max-age=3600"}, + "Date": []string{"Wed, 21 Oct 2015 07:28:00 GMT"}, + }, + }, + }, + }, + }) + + servableUpload := store.AsServableUpload(upload) + w := httptest.NewRecorder() + + err := servableUpload.ServeContent(context.Background(), w, r) + assert.NoError(err) + assert.Equal(http.StatusNotModified, w.Code) + assert.Equal(`"some-other-etag"`, w.Header().Get("ETag")) + assert.Equal("max-age=3600", w.Header().Get("Cache-Control")) + assert.Equal("Wed, 21 Oct 2015 07:28:00 GMT", w.Header().Get("Date")) +}