From b8759471f079f231c8a5a48797fdba771cd1729f Mon Sep 17 00:00:00 2001 From: Ishank Arora Date: Fri, 20 Nov 2020 13:20:25 +0100 Subject: [PATCH] Data transfers package (#1321) --- changelog/unreleased/http-datatx.md | 11 ++ cmd/reva/download.go | 39 +++-- cmd/reva/upload.go | 58 ++++--- cmd/revad/runtime/loader.go | 1 + go.mod | 2 +- go.sum | 2 + .../grpc/services/gateway/storageprovider.go | 164 ++++++++++-------- .../publicstorageprovider.go | 46 +++-- .../storageprovider/storageprovider.go | 51 ++++-- .../services/dataprovider/dataprovider.go | 136 ++++++--------- internal/http/services/dataprovider/get.go | 62 ------- internal/http/services/owncloud/ocdav/copy.go | 22 ++- internal/http/services/owncloud/ocdav/get.go | 12 +- internal/http/services/owncloud/ocdav/put.go | 11 +- internal/http/services/owncloud/ocdav/tus.go | 20 ++- .../put.go => pkg/rhttp/datatx/datatx.go | 33 +--- pkg/rhttp/datatx/manager/loader/loader.go | 26 +++ pkg/rhttp/datatx/manager/registry/registry.go | 34 ++++ pkg/rhttp/datatx/manager/simple/simple.go | 125 +++++++++++++ pkg/rhttp/datatx/manager/tus/tus.go | 152 ++++++++++++++++ pkg/sdk/action/action_test.go | 114 ------------ pkg/sdk/action/download.go | 26 ++- pkg/sdk/action/upload.go | 37 ++-- pkg/sdk/sdk_test.go | 74 -------- pkg/storage/fs/eosgrpc/eosgrpc.go | 6 +- pkg/storage/fs/ocis/upload.go | 19 +- pkg/storage/fs/owncloud/upload.go | 17 +- pkg/storage/fs/s3/upload.go | 6 +- pkg/storage/storage.go | 2 +- pkg/storage/utils/eosfs/upload.go | 6 +- pkg/storage/utils/localfs/upload.go | 17 +- 31 files changed, 772 insertions(+), 559 deletions(-) create mode 100644 changelog/unreleased/http-datatx.md delete mode 100644 internal/http/services/dataprovider/get.go rename internal/http/services/dataprovider/put.go => pkg/rhttp/datatx/datatx.go (51%) create mode 100644 pkg/rhttp/datatx/manager/loader/loader.go create mode 100644 pkg/rhttp/datatx/manager/registry/registry.go create mode 100644 pkg/rhttp/datatx/manager/simple/simple.go create mode 100644 pkg/rhttp/datatx/manager/tus/tus.go delete mode 100644 pkg/sdk/action/action_test.go delete mode 100644 pkg/sdk/sdk_test.go diff --git a/changelog/unreleased/http-datatx.md b/changelog/unreleased/http-datatx.md new file mode 100644 index 0000000000..2581a54551 --- /dev/null +++ b/changelog/unreleased/http-datatx.md @@ -0,0 +1,11 @@ +Enhancement: Add support for multiple data transfer protocols + +Previously, we had to configure which data transfer protocol to use in the +dataprovider service. A previous PR added the functionality to redirect requests +to different handlers based on the request method but that would lead to +conflicts if multiple protocols don't support mutually exclusive sets of +requests. This PR adds the functionality to have multiple such handlers +simultaneously and the client can choose which protocol to use. + +https://github.com/cs3org/reva/pull/1321 +https://github.com/cs3org/reva/pull/1285/ diff --git a/cmd/reva/download.go b/cmd/reva/download.go index bdcf67be98..4c0c6f0f58 100644 --- a/cmd/reva/download.go +++ b/cmd/reva/download.go @@ -26,9 +26,9 @@ import ( "time" "github.com/cheggaaa/pb" + gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" - typespb "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" "github.com/cs3org/reva/internal/http/services/datagateway" "github.com/cs3org/reva/pkg/errtypes" "github.com/cs3org/reva/pkg/rhttp" @@ -86,23 +86,28 @@ func downloadCommand() *command { return formatError(res.Status) } + p, err := getDownloadProtocolInfo(res.Protocols, "simple") + if err != nil { + return err + } + // TODO(labkode): upload to data server - fmt.Printf("Downloading from: %s\n", res.DownloadEndpoint) + fmt.Printf("Downloading from: %s\n", p.DownloadEndpoint) - content, err := checkDownloadWebdavRef(res.DownloadEndpoint, res.Opaque) + content, err := checkDownloadWebdavRef(res.Protocols) if err != nil { if _, ok := err.(errtypes.IsNotSupported); !ok { return err } - dataServerURL := res.DownloadEndpoint + dataServerURL := p.DownloadEndpoint // TODO(labkode): do a protocol switch httpReq, err := rhttp.NewRequest(ctx, "GET", dataServerURL, nil) if err != nil { return err } - httpReq.Header.Set(datagateway.TokenTransportHeader, res.Token) + httpReq.Header.Set(datagateway.TokenTransportHeader, p.Token) httpClient := rhttp.GetHTTPClient( rhttp.Context(ctx), // TODO make insecure configurable @@ -145,13 +150,27 @@ func downloadCommand() *command { return cmd } -func checkDownloadWebdavRef(endpoint string, opaque *typespb.Opaque) (io.Reader, error) { - if opaque == nil { +func getDownloadProtocolInfo(protocolInfos []*gateway.FileDownloadProtocol, protocol string) (*gateway.FileDownloadProtocol, error) { + for _, p := range protocolInfos { + if p.Protocol == protocol { + return p, nil + } + } + return nil, errtypes.NotFound(protocol) +} + +func checkDownloadWebdavRef(protocols []*gateway.FileDownloadProtocol) (io.Reader, error) { + p, err := getDownloadProtocolInfo(protocols, "simple") + if err != nil { + return nil, err + } + + if p.Opaque == nil { return nil, errtypes.NotSupported("opaque object not defined") } var token string - tokenOpaque, ok := opaque.Map["webdav-token"] + tokenOpaque, ok := p.Opaque.Map["webdav-token"] if !ok { return nil, errtypes.NotSupported("webdav token not defined") } @@ -163,7 +182,7 @@ func checkDownloadWebdavRef(endpoint string, opaque *typespb.Opaque) (io.Reader, } var filePath string - fileOpaque, ok := opaque.Map["webdav-file-path"] + fileOpaque, ok := p.Opaque.Map["webdav-file-path"] if !ok { return nil, errtypes.NotSupported("webdav file path not defined") } @@ -174,7 +193,7 @@ func checkDownloadWebdavRef(endpoint string, opaque *typespb.Opaque) (io.Reader, return nil, errors.New("opaque entry decoder not recognized: " + fileOpaque.Decoder) } - c := gowebdav.NewClient(endpoint, "", "") + c := gowebdav.NewClient(p.DownloadEndpoint, "", "") c.SetHeader(tokenpkg.TokenHeader, token) reader, err := c.ReadStream(filePath) diff --git a/cmd/reva/upload.go b/cmd/reva/upload.go index e3929a3b4a..63c1968332 100644 --- a/cmd/reva/upload.go +++ b/cmd/reva/upload.go @@ -30,6 +30,7 @@ import ( "github.com/cs3org/reva/internal/http/services/datagateway" "github.com/pkg/errors" + gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" typespb "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" @@ -50,11 +51,11 @@ func uploadCommand() *command { cmd := newCommand("upload") cmd.Description = func() string { return "upload a local file to the remote server" } cmd.Usage = func() string { return "Usage: upload [-flags] " } - disableTusFlag := cmd.Bool("disable-tus", false, "whether to disable tus protocol") + protocolFlag := cmd.String("protocol", "tus", "the protocol to be used for uploads") xsFlag := cmd.String("xs", "negotiate", "compute checksum") cmd.ResetFlags = func() { - *disableTusFlag, *xsFlag = false, "negotiate" + *protocolFlag, *xsFlag = "tus", "negotiate" } cmd.Action = func(w ...io.Writer) error { @@ -115,11 +116,7 @@ func uploadCommand() *command { return formatError(res.Status) } - // TODO(labkode): upload to data server - fmt.Printf("Data server: %s\n", res.UploadEndpoint) - fmt.Printf("Allowed checksums: %+v\n", res.AvailableChecksums) - - if err = checkUploadWebdavRef(res.UploadEndpoint, res.Opaque, md, fd); err != nil { + if err = checkUploadWebdavRef(res.Protocols, md, fd); err != nil { if _, ok := err.(errtypes.IsNotSupported); !ok { return err } @@ -127,7 +124,15 @@ func uploadCommand() *command { return nil } - xsType, err := guessXS(*xsFlag, res.AvailableChecksums) + p, err := getUploadProtocolInfo(res.Protocols, *protocolFlag) + if err != nil { + return err + } + + fmt.Printf("Data server: %s\n", p.UploadEndpoint) + fmt.Printf("Allowed checksums: %+v\n", p.AvailableChecksums) + + xsType, err := guessXS(*xsFlag, p.AvailableChecksums) if err != nil { return err } @@ -144,15 +149,15 @@ func uploadCommand() *command { return err } - dataServerURL := res.UploadEndpoint + dataServerURL := p.UploadEndpoint - if *disableTusFlag { + if *protocolFlag == "simple" { httpReq, err := rhttp.NewRequest(ctx, "PUT", dataServerURL, fd) if err != nil { return err } - httpReq.Header.Set(datagateway.TokenTransportHeader, res.Token) + httpReq.Header.Set(datagateway.TokenTransportHeader, p.Token) q := httpReq.URL.Query() q.Add("xs", xs) q.Add("xs_type", storageprovider.GRPC2PKGXS(xsType).String()) @@ -178,9 +183,7 @@ func uploadCommand() *command { if token, ok := tokenpkg.ContextGetToken(ctx); ok { c.Header.Add(tokenpkg.TokenHeader, token) } - if res.Token != "" { - c.Header.Add(datagateway.TokenTransportHeader, res.Token) - } + c.Header.Add(datagateway.TokenTransportHeader, p.Token) tusc, err := tus.NewClient(dataServerURL, c) if err != nil { return err @@ -233,13 +236,27 @@ func uploadCommand() *command { return cmd } -func checkUploadWebdavRef(endpoint string, opaque *typespb.Opaque, md os.FileInfo, fd *os.File) error { - if opaque == nil { +func getUploadProtocolInfo(protocolInfos []*gateway.FileUploadProtocol, protocol string) (*gateway.FileUploadProtocol, error) { + for _, p := range protocolInfos { + if p.Protocol == protocol { + return p, nil + } + } + return nil, errtypes.NotFound(protocol) +} + +func checkUploadWebdavRef(protocols []*gateway.FileUploadProtocol, md os.FileInfo, fd *os.File) error { + p, err := getUploadProtocolInfo(protocols, "simple") + if err != nil { + return err + } + + if p.Opaque == nil { return errtypes.NotSupported("opaque object not defined") } var token string - tokenOpaque, ok := opaque.Map["webdav-token"] + tokenOpaque, ok := p.Opaque.Map["webdav-token"] if !ok { return errtypes.NotSupported("webdav token not defined") } @@ -251,7 +268,7 @@ func checkUploadWebdavRef(endpoint string, opaque *typespb.Opaque, md os.FileInf } var filePath string - fileOpaque, ok := opaque.Map["webdav-file-path"] + fileOpaque, ok := p.Opaque.Map["webdav-file-path"] if !ok { return errtypes.NotSupported("webdav file path not defined") } @@ -262,12 +279,11 @@ func checkUploadWebdavRef(endpoint string, opaque *typespb.Opaque, md os.FileInf return errors.New("opaque entry decoder not recognized: " + fileOpaque.Decoder) } - c := gowebdav.NewClient(endpoint, "", "") + c := gowebdav.NewClient(p.UploadEndpoint, "", "") c.SetHeader(tokenpkg.TokenHeader, token) c.SetHeader("Upload-Length", strconv.FormatInt(md.Size(), 10)) - err := c.WriteStream(filePath, fd, 0700) - if err != nil { + if err = c.WriteStream(filePath, fd, 0700); err != nil { return err } diff --git a/cmd/revad/runtime/loader.go b/cmd/revad/runtime/loader.go index bdaa29b2a9..eab2d41111 100644 --- a/cmd/revad/runtime/loader.go +++ b/cmd/revad/runtime/loader.go @@ -35,6 +35,7 @@ import ( _ "github.com/cs3org/reva/pkg/ocm/provider/authorizer/loader" _ "github.com/cs3org/reva/pkg/ocm/share/manager/loader" _ "github.com/cs3org/reva/pkg/publicshare/manager/loader" + _ "github.com/cs3org/reva/pkg/rhttp/datatx/manager/loader" _ "github.com/cs3org/reva/pkg/share/manager/loader" _ "github.com/cs3org/reva/pkg/storage/fs/loader" _ "github.com/cs3org/reva/pkg/storage/registry/loader" diff --git a/go.mod b/go.mod index c415650a25..fe6f210519 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/cheggaaa/pb v1.0.29 github.com/coreos/go-oidc v2.2.1+incompatible github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e - github.com/cs3org/go-cs3apis v0.0.0-20201007120910-416ed6cf8b00 + github.com/cs3org/go-cs3apis v0.0.0-20201118090759-87929f5bae21 github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/eventials/go-tus v0.0.0-20200718001131-45c7ec8f5d59 github.com/go-ldap/ldap/v3 v3.2.4 diff --git a/go.sum b/go.sum index 19c1c01875..7c11a9bfdf 100644 --- a/go.sum +++ b/go.sum @@ -90,6 +90,8 @@ github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e h1:tqSPWQeueWTKnJVMJff github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e/go.mod h1:XJEZ3/EQuI3BXTp/6DUzFr850vlxq11I6satRtz0YQ4= github.com/cs3org/go-cs3apis v0.0.0-20201007120910-416ed6cf8b00 h1:LVl25JaflluOchVvaHWtoCynm5OaM+VNai0IYkcCSe0= github.com/cs3org/go-cs3apis v0.0.0-20201007120910-416ed6cf8b00/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY= +github.com/cs3org/go-cs3apis v0.0.0-20201118090759-87929f5bae21 h1:mZpylrgnCgSeaZ5EznvHIPIKuaQHMHZDi2wkJtk4M8Y= +github.com/cs3org/go-cs3apis v0.0.0-20201118090759-87929f5bae21/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/internal/grpc/services/gateway/storageprovider.go b/internal/grpc/services/gateway/storageprovider.go index 8537e8580d..399ab087a4 100644 --- a/internal/grpc/services/gateway/storageprovider.go +++ b/internal/grpc/services/gateway/storageprovider.go @@ -228,9 +228,14 @@ func (s *svc) InitiateFileDownload(ctx context.Context, req *provider.InitiateFi }, nil } return &gateway.InitiateFileDownloadResponse{ - Opaque: opaque, - Status: status.NewOK(ctx), - DownloadEndpoint: ep, + Status: status.NewOK(ctx), + Protocols: []*gateway.FileDownloadProtocol{ + &gateway.FileDownloadProtocol{ + Opaque: opaque, + Protocol: "simple", + DownloadEndpoint: ep, + }, + }, }, nil } @@ -310,9 +315,14 @@ func (s *svc) InitiateFileDownload(ctx context.Context, req *provider.InitiateFi }, nil } return &gateway.InitiateFileDownloadResponse{ - Opaque: opaque, - Status: status.NewOK(ctx), - DownloadEndpoint: ep, + Status: status.NewOK(ctx), + Protocols: []*gateway.FileDownloadProtocol{ + &gateway.FileDownloadProtocol{ + Opaque: opaque, + Protocol: "simple", + DownloadEndpoint: ep, + }, + }, }, nil } @@ -331,7 +341,6 @@ func (s *svc) InitiateFileDownload(ctx context.Context, req *provider.InitiateFi } func (s *svc) initiateFileDownload(ctx context.Context, req *provider.InitiateFileDownloadRequest) (*gateway.InitiateFileDownloadResponse, error) { - log := appctx.GetLogger(ctx) c, err := s.find(ctx, req.Ref) if err != nil { if _, ok := err.(errtypes.IsNotFound); ok { @@ -349,38 +358,42 @@ func (s *svc) initiateFileDownload(ctx context.Context, req *provider.InitiateFi return nil, errors.Wrap(err, "gateway: error calling InitiateFileDownload") } - res := &gateway.InitiateFileDownloadResponse{ - Opaque: storageRes.Opaque, - Status: storageRes.Status, - DownloadEndpoint: storageRes.DownloadEndpoint, - } + protocols := make([]*gateway.FileDownloadProtocol, len(storageRes.Protocols)) + for p := range storageRes.Protocols { + protocols[p] = &gateway.FileDownloadProtocol{ + Opaque: storageRes.Protocols[p].Opaque, + Protocol: storageRes.Protocols[p].Protocol, + DownloadEndpoint: storageRes.Protocols[p].DownloadEndpoint, + } - if storageRes.Expose { - log.Info().Msg("download is routed directly to data server - skipping data gateway") - return res, nil - } + if !storageRes.Protocols[p].Expose { + // sign the download location and pass it to the data gateway + u, err := url.Parse(protocols[p].DownloadEndpoint) + if err != nil { + return &gateway.InitiateFileDownloadResponse{ + Status: status.NewInternal(ctx, err, "wrong format for download endpoint"), + }, nil + } - // sign the download location and pass it to the data gateway - u, err := url.Parse(res.DownloadEndpoint) - if err != nil { - return &gateway.InitiateFileDownloadResponse{ - Status: status.NewInternal(ctx, err, "wrong format for download endpoint"), - }, nil - } + // TODO(labkode): calculate signature of the whole request? we only sign the URI now. Maybe worth https://tools.ietf.org/html/draft-cavage-http-signatures-11 + target := u.String() + token, err := s.sign(ctx, target) + if err != nil { + return &gateway.InitiateFileDownloadResponse{ + Status: status.NewInternal(ctx, err, "error creating signature for download"), + }, nil + } - // TODO(labkode): calculate signature of the whole request? we only sign the URI now. Maybe worth https://tools.ietf.org/html/draft-cavage-http-signatures-11 - target := u.String() - token, err := s.sign(ctx, target) - if err != nil { - return &gateway.InitiateFileDownloadResponse{ - Status: status.NewInternal(ctx, err, "error creating signature for download"), - }, nil + protocols[p].DownloadEndpoint = s.c.DataGatewayEndpoint + protocols[p].Token = token + } } - res.DownloadEndpoint = s.c.DataGatewayEndpoint - res.Token = token - - return res, nil + return &gateway.InitiateFileDownloadResponse{ + Opaque: storageRes.Opaque, + Status: storageRes.Status, + Protocols: protocols, + }, nil } func (s *svc) InitiateFileUpload(ctx context.Context, req *provider.InitiateFileUploadRequest) (*gateway.InitiateFileUploadResponse, error) { @@ -481,9 +494,14 @@ func (s *svc) InitiateFileUpload(ctx context.Context, req *provider.InitiateFile }, nil } return &gateway.InitiateFileUploadResponse{ - Opaque: opaque, - Status: status.NewOK(ctx), - UploadEndpoint: ep, + Status: status.NewOK(ctx), + Protocols: []*gateway.FileUploadProtocol{ + &gateway.FileUploadProtocol{ + Opaque: opaque, + Protocol: "simple", + UploadEndpoint: ep, + }, + }, }, nil } @@ -558,9 +576,14 @@ func (s *svc) InitiateFileUpload(ctx context.Context, req *provider.InitiateFile }, nil } return &gateway.InitiateFileUploadResponse{ - Opaque: opaque, - Status: status.NewOK(ctx), - UploadEndpoint: ep, + Status: status.NewOK(ctx), + Protocols: []*gateway.FileUploadProtocol{ + &gateway.FileUploadProtocol{ + Opaque: opaque, + Protocol: "simple", + UploadEndpoint: ep, + }, + }, }, nil } @@ -578,7 +601,6 @@ func (s *svc) InitiateFileUpload(ctx context.Context, req *provider.InitiateFile } func (s *svc) initiateFileUpload(ctx context.Context, req *provider.InitiateFileUploadRequest) (*gateway.InitiateFileUploadResponse, error) { - log := appctx.GetLogger(ctx) c, err := s.find(ctx, req.Ref) if err != nil { if _, ok := err.(errtypes.IsNotFound); ok { @@ -614,39 +636,43 @@ func (s *svc) initiateFileUpload(ctx context.Context, req *provider.InitiateFile } } - res := &gateway.InitiateFileUploadResponse{ - Opaque: storageRes.Opaque, - Status: storageRes.Status, - UploadEndpoint: storageRes.UploadEndpoint, - AvailableChecksums: storageRes.AvailableChecksums, - } + protocols := make([]*gateway.FileUploadProtocol, len(storageRes.Protocols)) + for p := range storageRes.Protocols { + protocols[p] = &gateway.FileUploadProtocol{ + Opaque: storageRes.Protocols[p].Opaque, + Protocol: storageRes.Protocols[p].Protocol, + UploadEndpoint: storageRes.Protocols[p].UploadEndpoint, + AvailableChecksums: storageRes.Protocols[p].AvailableChecksums, + } - if storageRes.Expose { - log.Info().Msg("upload is routed directly to data server - skipping data gateway") - return res, nil - } + if !storageRes.Protocols[p].Expose { + // sign the upload location and pass it to the data gateway + u, err := url.Parse(protocols[p].UploadEndpoint) + if err != nil { + return &gateway.InitiateFileUploadResponse{ + Status: status.NewInternal(ctx, err, "wrong format for upload endpoint"), + }, nil + } - // sign the upload location and pass it to the data gateway - u, err := url.Parse(res.UploadEndpoint) - if err != nil { - return &gateway.InitiateFileUploadResponse{ - Status: status.NewInternal(ctx, err, "wrong format for upload endpoint"), - }, nil - } + // TODO(labkode): calculate signature of the whole request? we only sign the URI now. Maybe worth https://tools.ietf.org/html/draft-cavage-http-signatures-11 + target := u.String() + token, err := s.sign(ctx, target) + if err != nil { + return &gateway.InitiateFileUploadResponse{ + Status: status.NewInternal(ctx, err, "error creating signature for upload"), + }, nil + } - // TODO(labkode): calculate signature of the url, we only sign the URI. At some points maybe worth https://tools.ietf.org/html/draft-cavage-http-signatures-11 - target := u.String() - token, err := s.sign(ctx, target) - if err != nil { - return &gateway.InitiateFileUploadResponse{ - Status: status.NewInternal(ctx, err, "error creating signature for download"), - }, nil + protocols[p].UploadEndpoint = s.c.DataGatewayEndpoint + protocols[p].Token = token + } } - res.UploadEndpoint = s.c.DataGatewayEndpoint - res.Token = token - - return res, nil + return &gateway.InitiateFileUploadResponse{ + Opaque: storageRes.Opaque, + Status: storageRes.Status, + Protocols: protocols, + }, nil } func (s *svc) GetPath(ctx context.Context, req *provider.GetPathRequest) (*provider.GetPathResponse, error) { diff --git a/internal/grpc/services/publicstorageprovider/publicstorageprovider.go b/internal/grpc/services/publicstorageprovider/publicstorageprovider.go index 9af3f5bb44..7fbd345011 100644 --- a/internal/grpc/services/publicstorageprovider/publicstorageprovider.go +++ b/internal/grpc/services/publicstorageprovider/publicstorageprovider.go @@ -185,16 +185,24 @@ func (s *service) initiateFileDownload(ctx context.Context, req *provider.Initia }, nil } - if !strings.HasSuffix(dRes.DownloadEndpoint, "/") { - dRes.DownloadEndpoint += "/" + protocols := make([]*provider.FileDownloadProtocol, len(dRes.Protocols)) + for p := range dRes.Protocols { + if !strings.HasSuffix(dRes.Protocols[p].DownloadEndpoint, "/") { + dRes.Protocols[p].DownloadEndpoint += "/" + } + dRes.Protocols[p].DownloadEndpoint += dRes.Protocols[p].Token + + protocols = append(protocols, &provider.FileDownloadProtocol{ + Opaque: dRes.Protocols[p].Opaque, + Protocol: dRes.Protocols[p].Protocol, + DownloadEndpoint: dRes.Protocols[p].DownloadEndpoint, + Expose: true, // the gateway already has encoded the upload endpoint + }) } - dRes.DownloadEndpoint += dRes.Token return &provider.InitiateFileDownloadResponse{ - Opaque: req.Opaque, - Status: dRes.Status, - DownloadEndpoint: dRes.DownloadEndpoint, - Expose: true, // the gateway already has encoded the upload endpoint + Status: dRes.Status, + Protocols: protocols, }, nil } @@ -221,17 +229,25 @@ func (s *service) InitiateFileUpload(ctx context.Context, req *provider.Initiate }, nil } - if !strings.HasSuffix(uRes.UploadEndpoint, "/") { - uRes.UploadEndpoint += "/" + protocols := make([]*provider.FileUploadProtocol, len(uRes.Protocols)) + for p := range uRes.Protocols { + if !strings.HasSuffix(uRes.Protocols[p].UploadEndpoint, "/") { + uRes.Protocols[p].UploadEndpoint += "/" + } + uRes.Protocols[p].UploadEndpoint += uRes.Protocols[p].Token + + protocols = append(protocols, &provider.FileUploadProtocol{ + Opaque: uRes.Protocols[p].Opaque, + Protocol: uRes.Protocols[p].Protocol, + UploadEndpoint: uRes.Protocols[p].UploadEndpoint, + AvailableChecksums: uRes.Protocols[p].AvailableChecksums, + Expose: true, // the gateway already has encoded the upload endpoint + }) } - uRes.UploadEndpoint += uRes.Token res := &provider.InitiateFileUploadResponse{ - UploadEndpoint: uRes.UploadEndpoint, - Status: uRes.Status, - AvailableChecksums: uRes.AvailableChecksums, - Opaque: uRes.Opaque, - Expose: true, // the gateway already has encoded the upload endpoint + Status: uRes.Status, + Protocols: protocols, } return res, nil diff --git a/internal/grpc/services/storageprovider/storageprovider.go b/internal/grpc/services/storageprovider/storageprovider.go index 957491920d..c47803d37e 100644 --- a/internal/grpc/services/storageprovider/storageprovider.go +++ b/internal/grpc/services/storageprovider/storageprovider.go @@ -273,12 +273,21 @@ func (s *service) InitiateFileDownload(ctx context.Context, req *provider.Initia Status: status.NewInternal(ctx, err, "error unwrapping path"), }, nil } - u.Path = path.Join(u.Path, newRef.GetPath()) + + // Currently, we only support the simple protocol for GET requests + // Once we have multiple protocols, this would be moved to the fs layer + u.Path = path.Join(u.Path, "simple", newRef.GetPath()) + log.Info().Str("data-server", u.String()).Str("fn", req.Ref.GetPath()).Msg("file download") res := &provider.InitiateFileDownloadResponse{ - DownloadEndpoint: u.String(), - Status: status.NewOK(ctx), - Expose: s.conf.ExposeDataServer, + Protocols: []*provider.FileDownloadProtocol{ + &provider.FileDownloadProtocol{ + Protocol: "simple", + DownloadEndpoint: u.String(), + Expose: s.conf.ExposeDataServer, + }, + }, + Status: status.NewOK(ctx), } return res, nil } @@ -314,7 +323,7 @@ func (s *service) InitiateFileUpload(ctx context.Context, req *provider.Initiate metadata["mtime"] = string(req.Opaque.Map["X-OC-Mtime"].Value) } } - uploadID, err := s.storage.InitiateUpload(ctx, newRef, uploadLength, metadata) + uploadIDs, err := s.storage.InitiateUpload(ctx, newRef, uploadLength, metadata) if err != nil { var st *rpc.Status switch err.(type) { @@ -330,23 +339,27 @@ func (s *service) InitiateFileUpload(ctx context.Context, req *provider.Initiate }, nil } - u := *s.dataServerURL - u.Path = path.Join(u.Path, uploadID) - if err != nil { - return &provider.InitiateFileUploadResponse{ - Status: status.NewInternal(ctx, err, "error parsing data server URL"), - }, nil + protocols := make([]*provider.FileUploadProtocol, len(uploadIDs)) + var i int + for protocol, ID := range uploadIDs { + u := *s.dataServerURL + u.Path = path.Join(u.Path, protocol, ID) + protocols[i] = &provider.FileUploadProtocol{ + Protocol: protocol, + UploadEndpoint: u.String(), + AvailableChecksums: s.availableXS, + Expose: s.conf.ExposeDataServer, + } + i++ + log.Info().Str("data-server", u.String()). + Str("fn", req.Ref.GetPath()). + Str("xs", fmt.Sprintf("%+v", s.conf.AvailableXS)). + Msg("file upload") } - log.Info().Str("data-server", u.String()). - Str("fn", req.Ref.GetPath()). - Str("xs", fmt.Sprintf("%+v", s.conf.AvailableXS)). - Msg("file upload") res := &provider.InitiateFileUploadResponse{ - UploadEndpoint: u.String(), - Status: status.NewOK(ctx), - AvailableChecksums: s.availableXS, - Expose: s.conf.ExposeDataServer, + Protocols: protocols, + Status: status.NewOK(ctx), } return res, nil } diff --git a/internal/http/services/dataprovider/dataprovider.go b/internal/http/services/dataprovider/dataprovider.go index d49491982e..c484d5cd55 100644 --- a/internal/http/services/dataprovider/dataprovider.go +++ b/internal/http/services/dataprovider/dataprovider.go @@ -23,12 +23,13 @@ import ( "net/http" "github.com/cs3org/reva/pkg/appctx" + datatxregistry "github.com/cs3org/reva/pkg/rhttp/datatx/manager/registry" "github.com/cs3org/reva/pkg/rhttp/global" + "github.com/cs3org/reva/pkg/rhttp/router" "github.com/cs3org/reva/pkg/storage" "github.com/cs3org/reva/pkg/storage/fs/registry" "github.com/mitchellh/mapstructure" "github.com/rs/zerolog" - tusd "github.com/tus/tusd/pkg/handler" ) func init() { @@ -39,6 +40,7 @@ type config struct { Prefix string `mapstructure:"prefix" docs:"data;The prefix to be used for this HTTP service"` Driver string `mapstructure:"driver" docs:"localhome;The storage driver to be used."` Drivers map[string]map[string]interface{} `mapstructure:"drivers" docs:"url:pkg/storage/fs/localhome/localhome.go;The configuration for the storage driver"` + DataTXs map[string]map[string]interface{} `mapstructure:"data_txs" docs:"url:pkg/rhttp/datatx/manager/simple/simple.go;The configuration for the data tx protocols"` Timeout int64 `mapstructure:"timeout"` Insecure bool `mapstructure:"insecure"` } @@ -56,6 +58,7 @@ type svc struct { conf *config handler http.Handler storage storage.FS + dataTXs map[string]http.Handler } // New returns a new datasvc @@ -72,16 +75,50 @@ func New(m map[string]interface{}, log *zerolog.Logger) (global.Service, error) return nil, err } + dataTXs, err := getDataTXs(conf, fs) + if err != nil { + return nil, err + } + s := &svc{ storage: fs, conf: conf, + dataTXs: dataTXs, } err = s.setHandler() return s, err } -// Close performs cleanup. +func getFS(c *config) (storage.FS, error) { + if f, ok := registry.NewFuncs[c.Driver]; ok { + return f(c.Drivers[c.Driver]) + } + return nil, fmt.Errorf("driver not found: %s", c.Driver) +} + +func getDataTXs(c *config, fs storage.FS) (map[string]http.Handler, error) { + if c.DataTXs == nil { + c.DataTXs = make(map[string]map[string]interface{}) + } + if len(c.DataTXs) == 0 { + c.DataTXs["simple"] = make(map[string]interface{}) + c.DataTXs["tus"] = make(map[string]interface{}) + } + + txs := make(map[string]http.Handler) + for t := range c.DataTXs { + if f, ok := datatxregistry.NewFuncs[t]; ok { + if tx, err := f(c.DataTXs[t]); err == nil { + if handler, err := tx.Handler(fs); err == nil { + txs[t] = handler + } + } + } + } + return txs, nil +} + func (s *svc) Close() error { return nil } @@ -90,16 +127,6 @@ func (s *svc) Unprotected() []string { return []string{} } -// Create a new DataStore instance which is responsible for -// storing the uploaded file on disk in the specified directory. -// This path _must_ exist before we store uploads in it. -func getFS(c *config) (storage.FS, error) { - if f, ok := registry.NewFuncs[c.Driver]; ok { - return f(c.Drivers[c.Driver]) - } - return nil, fmt.Errorf("driver not found: %s", c.Driver) -} - func (s *svc) Prefix() string { return s.conf.Prefix } @@ -110,88 +137,27 @@ func (s *svc) Handler() http.Handler { func (s *svc) setHandler() error { - tusHandler := s.getTusHandler() - s.handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { log := appctx.GetLogger(r.Context()) log.Debug().Msgf("dataprovider routing: path=%s", r.URL.Path) - method := r.Method - // https://github.com/tus/tus-resumable-upload-protocol/blob/master/protocol.md#x-http-method-override - if r.Header.Get("X-HTTP-Method-Override") != "" { - method = r.Header.Get("X-HTTP-Method-Override") - } + head, tail := router.ShiftPath(r.URL.Path) - switch method { - // old fashioned download. - // GET is not part of the tus.io protocol - // TODO allow range based get requests? that end before the current offset - case "GET": - s.doGet(w, r) - case "PUT": - s.doPut(w, r) - case "HEAD": - w.WriteHeader(http.StatusOK) - - // tus.io based uploads - // uploads are initiated using the CS3 APIs Initiate Upload call - case "POST": - if tusHandler != nil { - tusHandler.PostFile(w, r) - } else { - w.WriteHeader(http.StatusNotImplemented) - } - case "PATCH": - if tusHandler != nil { - tusHandler.PatchFile(w, r) - } else { - w.WriteHeader(http.StatusNotImplemented) - } - // TODO Only attach the DELETE handler if the Terminate() method is provided - case "DELETE": - if tusHandler != nil { - tusHandler.DelFile(w, r) - } else { - w.WriteHeader(http.StatusNotImplemented) - } - default: - w.WriteHeader(http.StatusNotImplemented) + if handler, ok := s.dataTXs[head]; ok { + r.URL.Path = tail + handler.ServeHTTP(w, r) return } - }) - return nil -} - -// Composable is the interface that a struct needs to implement -// to be composable, so that it can support the TUS methods -type composable interface { - UseIn(composer *tusd.StoreComposer) -} - -func (s *svc) getTusHandler() *tusd.UnroutedHandler { - composable, ok := s.storage.(composable) - if ok { - // A storage backend for tusd may consist of multiple different parts which - // handle upload creation, locking, termination and so on. The composer is a - // place where all those separated pieces are joined together. In this example - // we only use the file store but you may plug in multiple. - composer := tusd.NewStoreComposer() - - // let the composable storage tell tus which extensions it supports - composable.UseIn(composer) - - config := tusd.Config{ - BasePath: s.conf.Prefix, - StoreComposer: composer, - //Logger: logger, // TODO use logger + // If we don't find a prefix match for any of the protocols, upload the resource + // through the direct HTTP protocol + if handler, ok := s.dataTXs["simple"]; ok { + handler.ServeHTTP(w, r) + return } - handler, err := tusd.NewUnroutedHandler(config) - if err != nil { - return nil - } - return handler - } + w.WriteHeader(http.StatusInternalServerError) + }) + return nil } diff --git a/internal/http/services/dataprovider/get.go b/internal/http/services/dataprovider/get.go deleted file mode 100644 index dfd96a55d1..0000000000 --- a/internal/http/services/dataprovider/get.go +++ /dev/null @@ -1,62 +0,0 @@ -// Copyright 2018-2020 CERN -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -// In applying this license, CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -package dataprovider - -import ( - "io" - "net/http" - "strings" - - provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" - "github.com/cs3org/reva/pkg/appctx" - "github.com/cs3org/reva/pkg/errtypes" -) - -func (s *svc) doGet(w http.ResponseWriter, r *http.Request) { - ctx := r.Context() - log := appctx.GetLogger(ctx) - var fn string - files, ok := r.URL.Query()["filename"] - if !ok || len(files[0]) < 1 { - fn = r.URL.Path - } else { - fn = files[0] - } - - fsfn := strings.TrimPrefix(fn, s.conf.Prefix) - ref := &provider.Reference{Spec: &provider.Reference_Path{Path: fsfn}} - - rc, err := s.storage.Download(ctx, ref) - if err != nil { - if _, ok := err.(errtypes.IsNotFound); ok { - log.Debug().Err(err).Msg("datasvc: file not found") - w.WriteHeader(http.StatusNotFound) - } else { - log.Error().Err(err).Msg("datasvc: error downloading file") - w.WriteHeader(http.StatusInternalServerError) - } - return - } - - _, err = io.Copy(w, rc) - if err != nil { - log.Error().Err(err).Msg("error copying data to response") - return - } -} diff --git a/internal/http/services/owncloud/ocdav/copy.go b/internal/http/services/owncloud/ocdav/copy.go index 54606421d3..f8a350bf5c 100644 --- a/internal/http/services/owncloud/ocdav/copy.go +++ b/internal/http/services/owncloud/ocdav/copy.go @@ -242,6 +242,13 @@ func (s *svc) descend(ctx context.Context, client gateway.GatewayAPIClient, src return fmt.Errorf("status code %d", dRes.Status.Code) } + var downloadEP, downloadToken string + for _, p := range dRes.Protocols { + if p.Protocol == "simple" { + downloadEP, downloadToken = p.DownloadEndpoint, p.Token + } + } + // 2. get upload url uReq := &provider.InitiateFileUploadRequest{ @@ -268,13 +275,20 @@ func (s *svc) descend(ctx context.Context, client gateway.GatewayAPIClient, src return fmt.Errorf("status code %d", uRes.Status.Code) } + var uploadEP, uploadToken string + for _, p := range uRes.Protocols { + if p.Protocol == "simple" { + uploadEP, uploadToken = p.UploadEndpoint, p.Token + } + } + // 3. do download - httpDownloadReq, err := rhttp.NewRequest(ctx, "GET", dRes.DownloadEndpoint, nil) + httpDownloadReq, err := rhttp.NewRequest(ctx, "GET", downloadEP, nil) if err != nil { return err } - httpDownloadReq.Header.Set(datagateway.TokenTransportHeader, dRes.Token) + httpDownloadReq.Header.Set(datagateway.TokenTransportHeader, downloadToken) httpDownloadRes, err := s.client.Do(httpDownloadReq) if err != nil { @@ -288,11 +302,11 @@ func (s *svc) descend(ctx context.Context, client gateway.GatewayAPIClient, src // 4. do upload if src.GetSize() > 0 { - httpUploadReq, err := rhttp.NewRequest(ctx, "PUT", uRes.UploadEndpoint, httpDownloadRes.Body) + httpUploadReq, err := rhttp.NewRequest(ctx, "PUT", uploadEP, httpDownloadRes.Body) if err != nil { return err } - httpUploadReq.Header.Set(datagateway.TokenTransportHeader, uRes.Token) + httpUploadReq.Header.Set(datagateway.TokenTransportHeader, uploadToken) httpUploadRes, err := s.client.Do(httpUploadReq) if err != nil { diff --git a/internal/http/services/owncloud/ocdav/get.go b/internal/http/services/owncloud/ocdav/get.go index 98fb997892..806dede12f 100644 --- a/internal/http/services/owncloud/ocdav/get.go +++ b/internal/http/services/owncloud/ocdav/get.go @@ -111,16 +111,20 @@ func (s *svc) handleGet(w http.ResponseWriter, r *http.Request, ns string) { return } - dataServerURL := dRes.DownloadEndpoint + var ep, token string + for _, p := range dRes.Protocols { + if p.Protocol == "simple" { + ep, token = p.DownloadEndpoint, p.Token + } + } - // TODO(labkode): perform protocol switch - httpReq, err := rhttp.NewRequest(ctx, "GET", dataServerURL, nil) + httpReq, err := rhttp.NewRequest(ctx, "GET", ep, nil) if err != nil { log.Error().Err(err).Msg("error creating http request") w.WriteHeader(http.StatusInternalServerError) return } - httpReq.Header.Set(datagateway.TokenTransportHeader, dRes.Token) + httpReq.Header.Set(datagateway.TokenTransportHeader, token) httpClient := s.client httpRes, err := httpClient.Do(httpReq) diff --git a/internal/http/services/owncloud/ocdav/put.go b/internal/http/services/owncloud/ocdav/put.go index 36de5aa544..935a67bae0 100644 --- a/internal/http/services/owncloud/ocdav/put.go +++ b/internal/http/services/owncloud/ocdav/put.go @@ -237,13 +237,20 @@ func (s *svc) handlePutHelper(w http.ResponseWriter, r *http.Request, content io return } + var ep, token string + for _, p := range uRes.Protocols { + if p.Protocol == "simple" { + ep, token = p.UploadEndpoint, p.Token + } + } + if length > 0 { - httpReq, err := rhttp.NewRequest(ctx, "PUT", uRes.UploadEndpoint, content) + httpReq, err := rhttp.NewRequest(ctx, "PUT", ep, content) if err != nil { w.WriteHeader(http.StatusInternalServerError) return } - httpReq.Header.Set(datagateway.TokenTransportHeader, uRes.Token) + httpReq.Header.Set(datagateway.TokenTransportHeader, token) httpRes, err := s.client.Do(httpReq) if err != nil { diff --git a/internal/http/services/owncloud/ocdav/tus.go b/internal/http/services/owncloud/ocdav/tus.go index 010d4c4f5b..8ef8d8ddf8 100644 --- a/internal/http/services/owncloud/ocdav/tus.go +++ b/internal/http/services/owncloud/ocdav/tus.go @@ -168,16 +168,23 @@ func (s *svc) handleTusPost(w http.ResponseWriter, r *http.Request, ns string) { return } + var ep, token string + for _, p := range uRes.Protocols { + if p.Protocol == "tus" { + ep, token = p.UploadEndpoint, p.Token + } + } + // TUS clients don't understand the reva transfer token. We need to append it to the upload endpoint. // The DataGateway has to take care of pulling it back into the request header upon request arrival. - if uRes.Token != "" { - if !strings.HasSuffix(uRes.UploadEndpoint, "/") { - uRes.UploadEndpoint += "/" + if token != "" { + if !strings.HasSuffix(ep, "/") { + ep += "/" } - uRes.UploadEndpoint += uRes.Token + ep += token } - w.Header().Set("Location", uRes.UploadEndpoint) + w.Header().Set("Location", ep) // for creation-with-upload extension forward bytes to dataprovider // TODO check this really streams @@ -193,7 +200,7 @@ func (s *svc) handleTusPost(w http.ResponseWriter, r *http.Request, ns string) { var httpRes *http.Response if length != 0 { - httpReq, err := rhttp.NewRequest(ctx, "PATCH", uRes.UploadEndpoint, r.Body) + httpReq, err := rhttp.NewRequest(ctx, "PATCH", ep, r.Body) if err != nil { log.Err(err).Msg("wrong request") w.WriteHeader(http.StatusInternalServerError) @@ -269,5 +276,6 @@ func (s *svc) handleTusPost(w http.ResponseWriter, r *http.Request, ns string) { w.Header().Set("Last-Modified", lastModifiedString) } } + w.WriteHeader(http.StatusCreated) } diff --git a/internal/http/services/dataprovider/put.go b/pkg/rhttp/datatx/datatx.go similarity index 51% rename from internal/http/services/dataprovider/put.go rename to pkg/rhttp/datatx/datatx.go index b9b1fe9d4d..16eb64ac42 100644 --- a/internal/http/services/dataprovider/put.go +++ b/pkg/rhttp/datatx/datatx.go @@ -16,36 +16,17 @@ // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. -package dataprovider +// Package datatx provides a library to abstract the complexity +// of using various data transfer protocols. +package datatx import ( "net/http" - "strings" - provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" - "github.com/cs3org/reva/pkg/appctx" - "github.com/cs3org/reva/pkg/errtypes" + "github.com/cs3org/reva/pkg/storage" ) -func (s *svc) doPut(w http.ResponseWriter, r *http.Request) { - ctx := r.Context() - log := appctx.GetLogger(ctx) - fn := r.URL.Path - defer r.Body.Close() - - fsfn := strings.TrimPrefix(fn, s.conf.Prefix) - ref := &provider.Reference{Spec: &provider.Reference_Path{Path: fsfn}} - - err := s.storage.Upload(ctx, ref, r.Body) - if err != nil { - if _, ok := err.(errtypes.IsPartialContent); ok { - w.WriteHeader(http.StatusPartialContent) - return - } - log.Error().Err(err).Msg("error uploading file") - w.WriteHeader(http.StatusInternalServerError) - return - } - - w.WriteHeader(http.StatusOK) +// DataTX provides an abstraction around various data transfer protocols. +type DataTX interface { + Handler(fs storage.FS) (http.Handler, error) } diff --git a/pkg/rhttp/datatx/manager/loader/loader.go b/pkg/rhttp/datatx/manager/loader/loader.go new file mode 100644 index 0000000000..be6eba0cc2 --- /dev/null +++ b/pkg/rhttp/datatx/manager/loader/loader.go @@ -0,0 +1,26 @@ +// Copyright 2018-2020 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package loader + +import ( + // Load core data transfer protocols + _ "github.com/cs3org/reva/pkg/rhttp/datatx/manager/simple" + _ "github.com/cs3org/reva/pkg/rhttp/datatx/manager/tus" + // Add your own here +) diff --git a/pkg/rhttp/datatx/manager/registry/registry.go b/pkg/rhttp/datatx/manager/registry/registry.go new file mode 100644 index 0000000000..56e152c0c4 --- /dev/null +++ b/pkg/rhttp/datatx/manager/registry/registry.go @@ -0,0 +1,34 @@ +// Copyright 2018-2020 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package registry + +import "github.com/cs3org/reva/pkg/rhttp/datatx" + +// NewFunc is the function that data transfer implementations +// should register at init time. +type NewFunc func(map[string]interface{}) (datatx.DataTX, error) + +// NewFuncs is a map containing all the registered data transfers. +var NewFuncs = map[string]NewFunc{} + +// Register registers a new data transfer new function. +// Not safe for concurrent use. Safe for use from package init. +func Register(name string, f NewFunc) { + NewFuncs[name] = f +} diff --git a/pkg/rhttp/datatx/manager/simple/simple.go b/pkg/rhttp/datatx/manager/simple/simple.go new file mode 100644 index 0000000000..a48fec6672 --- /dev/null +++ b/pkg/rhttp/datatx/manager/simple/simple.go @@ -0,0 +1,125 @@ +// Copyright 2018-2020 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package simple + +import ( + "io" + "net/http" + + "github.com/pkg/errors" + + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + "github.com/cs3org/reva/pkg/appctx" + "github.com/cs3org/reva/pkg/errtypes" + "github.com/cs3org/reva/pkg/rhttp/datatx" + "github.com/cs3org/reva/pkg/rhttp/datatx/manager/registry" + "github.com/cs3org/reva/pkg/storage" + "github.com/mitchellh/mapstructure" +) + +func init() { + registry.Register("simple", New) +} + +type config struct{} + +type manager struct { + conf *config +} + +func parseConfig(m map[string]interface{}) (*config, error) { + c := &config{} + if err := mapstructure.Decode(m, c); err != nil { + err = errors.Wrap(err, "error decoding conf") + return nil, err + } + return c, nil +} + +// New returns a datatx manager implementation that relies on HTTP PUT/GET. +func New(m map[string]interface{}) (datatx.DataTX, error) { + c, err := parseConfig(m) + if err != nil { + return nil, err + } + + return &manager{conf: c}, nil +} + +func (m *manager) Handler(fs storage.FS) (http.Handler, error) { + h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case "GET": + ctx := r.Context() + log := appctx.GetLogger(ctx) + var fn string + files, ok := r.URL.Query()["filename"] + if !ok || len(files[0]) < 1 { + fn = r.URL.Path + } else { + fn = files[0] + } + + ref := &provider.Reference{Spec: &provider.Reference_Path{Path: fn}} + + rc, err := fs.Download(ctx, ref) + if err != nil { + if _, ok := err.(errtypes.IsNotFound); ok { + log.Err(err).Msg("datasvc: file not found") + w.WriteHeader(http.StatusNotFound) + } else { + log.Err(err).Msg("datasvc: error downloading file") + w.WriteHeader(http.StatusInternalServerError) + } + return + } + + _, err = io.Copy(w, rc) + if err != nil { + log.Error().Err(err).Msg("error copying data to response") + return + } + + case "PUT": + ctx := r.Context() + log := appctx.GetLogger(ctx) + fn := r.URL.Path + defer r.Body.Close() + + ref := &provider.Reference{Spec: &provider.Reference_Path{Path: fn}} + + err := fs.Upload(ctx, ref, r.Body) + if err != nil { + if _, ok := err.(errtypes.IsPartialContent); ok { + w.WriteHeader(http.StatusPartialContent) + return + } + log.Error().Err(err).Msg("error uploading file") + w.WriteHeader(http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) + + default: + w.WriteHeader(http.StatusNotImplemented) + } + }) + return h, nil +} diff --git a/pkg/rhttp/datatx/manager/tus/tus.go b/pkg/rhttp/datatx/manager/tus/tus.go new file mode 100644 index 0000000000..3453054562 --- /dev/null +++ b/pkg/rhttp/datatx/manager/tus/tus.go @@ -0,0 +1,152 @@ +// Copyright 2018-2020 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package tus + +import ( + "io" + "net/http" + + "github.com/pkg/errors" + + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + "github.com/cs3org/reva/pkg/appctx" + "github.com/cs3org/reva/pkg/errtypes" + "github.com/cs3org/reva/pkg/rhttp/datatx" + "github.com/cs3org/reva/pkg/rhttp/datatx/manager/registry" + "github.com/cs3org/reva/pkg/storage" + "github.com/mitchellh/mapstructure" + tusd "github.com/tus/tusd/pkg/handler" +) + +func init() { + registry.Register("tus", New) +} + +type config struct{} + +type manager struct { + conf *config +} + +func parseConfig(m map[string]interface{}) (*config, error) { + c := &config{} + if err := mapstructure.Decode(m, c); err != nil { + err = errors.Wrap(err, "error decoding conf") + return nil, err + } + return c, nil +} + +// New returns a datatx manager implementation that relies on HTTP PUT/GET. +func New(m map[string]interface{}) (datatx.DataTX, error) { + c, err := parseConfig(m) + if err != nil { + return nil, err + } + + return &manager{conf: c}, nil +} + +func (m *manager) Handler(fs storage.FS) (http.Handler, error) { + composable, ok := fs.(composable) + if !ok { + return nil, errtypes.NotSupported("file system does not support the tus protocol") + } + + // A storage backend for tusd may consist of multiple different parts which + // handle upload creation, locking, termination and so on. The composer is a + // place where all those separated pieces are joined together. In this example + // we only use the file store but you may plug in multiple. + composer := tusd.NewStoreComposer() + + // let the composable storage tell tus which extensions it supports + composable.UseIn(composer) + + config := tusd.Config{ + StoreComposer: composer, + } + + handler, err := tusd.NewUnroutedHandler(config) + if err != nil { + return nil, err + } + + h := handler.Middleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + log := appctx.GetLogger(r.Context()) + log.Info().Msgf("tusd routing: path=%s", r.URL.Path) + + method := r.Method + // https://github.com/tus/tus-resumable-upload-protocol/blob/master/protocol.md#x-http-method-override + if r.Header.Get("X-HTTP-Method-Override") != "" { + method = r.Header.Get("X-HTTP-Method-Override") + } + + switch method { + case "POST": + handler.PostFile(w, r) + case "HEAD": + handler.HeadFile(w, r) + case "PATCH": + handler.PatchFile(w, r) + case "DELETE": + handler.DelFile(w, r) + // TODO(pvince81): allow for range-based requests? + case "GET": + ctx := r.Context() + log := appctx.GetLogger(ctx) + var fn string + files, ok := r.URL.Query()["filename"] + if !ok || len(files[0]) < 1 { + fn = r.URL.Path + } else { + fn = files[0] + } + + ref := &provider.Reference{Spec: &provider.Reference_Path{Path: fn}} + + rc, err := fs.Download(ctx, ref) + if err != nil { + if _, ok := err.(errtypes.IsNotFound); ok { + log.Err(err).Msg("datasvc: file not found") + w.WriteHeader(http.StatusNotFound) + } else { + log.Err(err).Msg("datasvc: error downloading file") + w.WriteHeader(http.StatusInternalServerError) + } + return + } + + _, err = io.Copy(w, rc) + if err != nil { + log.Error().Err(err).Msg("error copying data to response") + return + } + default: + w.WriteHeader(http.StatusNotImplemented) + } + })) + + return h, nil +} + +// Composable is the interface that a struct needs to implement +// to be composable, so that it can support the TUS methods +type composable interface { + UseIn(composer *tusd.StoreComposer) +} diff --git a/pkg/sdk/action/action_test.go b/pkg/sdk/action/action_test.go deleted file mode 100644 index 4052e99f89..0000000000 --- a/pkg/sdk/action/action_test.go +++ /dev/null @@ -1,114 +0,0 @@ -// Copyright 2018-2020 CERN -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -// In applying this license, CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -package action_test - -import ( - "fmt" - "testing" - - "github.com/cs3org/reva/pkg/sdk/action" - testintl "github.com/cs3org/reva/pkg/sdk/common/testing" -) - -func TestActions(t *testing.T) { - tests := []struct { - host string - username string - password string - }{ - {"sciencemesh-test.uni-muenster.de:9600", "test", "testpass"}, - } - - for _, test := range tests { - t.Run(test.host, func(t *testing.T) { - // Prepare the session - if session, err := testintl.CreateTestSession(test.host, test.username, test.password); err == nil { - // Try creating a directory - if act, err := action.NewFileOperationsAction(session); err == nil { - if err := act.MakePath("/home/subdir/subsub"); err != nil { - t.Errorf(testintl.FormatTestError("FileOperationsAction.MakePath", err, "/home/subdir/subsub")) - } - } else { - t.Errorf(testintl.FormatTestError("NewFileOperationsAction", err, session)) - } - - // Try uploading - if act, err := action.NewUploadAction(session); err == nil { - act.EnableTUS = true - if _, err := act.UploadBytes([]byte("HELLO WORLD!\n"), "/home/subdir/tests.txt"); err != nil { - t.Errorf(testintl.FormatTestError("UploadAction.UploadBytes", err, []byte("HELLO WORLD!\n"), "/home/subdir/tests.txt")) - } - } else { - t.Errorf(testintl.FormatTestError("NewUploadAction", err, session)) - } - - // Try moving - if act, err := action.NewFileOperationsAction(session); err == nil { - if err := act.MoveTo("/home/subdir/tests.txt", "/home/subdir/subtest"); err != nil { - t.Errorf(testintl.FormatTestError("FileOperationsAction.MoveTo", err, "/home/subdir/tests.txt", "/home/subdir/subtest")) - } - } else { - t.Errorf(testintl.FormatTestError("NewFileOperationsAction", err, session)) - } - - // Try downloading - if act, err := action.NewDownloadAction(session); err == nil { - if _, err := act.DownloadFile("/home/subdir/subtest/tests.txt"); err != nil { - t.Errorf(testintl.FormatTestError("DownloadAction.DownloadFile", err, "/home/subdir/subtest/tests.txt")) - } - } else { - t.Errorf(testintl.FormatTestError("NewDownloadAction", err, session)) - } - - // Try listing - if act, err := action.NewEnumFilesAction(session); err == nil { - if _, err := act.ListFiles("/home", true); err != nil { - t.Errorf(testintl.FormatTestError("EnumFilesAction.ListFiles", err, "/home", true)) - } - } else { - t.Errorf(testintl.FormatTestError("NewEnumFilesAction", err, session)) - } - - // Try deleting a directory - if act, err := action.NewFileOperationsAction(session); err == nil { - if err := act.Remove("/home/subdir"); err != nil { - t.Errorf(testintl.FormatTestError("FileOperationsAction.Remove", err, "/home/subdir")) - } - } else { - t.Errorf(testintl.FormatTestError("NewFileOperationsAction", err, session)) - } - - // Try accessing some files and directories - if act, err := action.NewFileOperationsAction(session); err == nil { - if act.FileExists("/home/blargh.txt") { - t.Errorf(testintl.FormatTestError("FileOperationsAction.FileExists", fmt.Errorf("non-existing file reported as existing"), "/home/blargh.txt")) - } - - if !act.DirExists("/home") { - t.Errorf(testintl.FormatTestError("FileOperationsAction.DirExists", fmt.Errorf("/home dir reported as non-existing"), "/home")) - } - } else { - t.Errorf(testintl.FormatTestError("NewFileOperationsAction", err, session)) - } - } else { - t.Errorf(testintl.FormatTestError("CreateTestSession", err)) - } - }) - } -} diff --git a/pkg/sdk/action/download.go b/pkg/sdk/action/download.go index 22293fbc6c..18fd060ba0 100644 --- a/pkg/sdk/action/download.go +++ b/pkg/sdk/action/download.go @@ -24,7 +24,7 @@ import ( gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" storage "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" - + "github.com/cs3org/reva/pkg/errtypes" "github.com/cs3org/reva/pkg/sdk" "github.com/cs3org/reva/pkg/sdk/common/net" ) @@ -60,24 +60,29 @@ func (action *DownloadAction) Download(fileInfo *storage.ResourceInfo) ([]byte, return nil, err } + p, err := getDownloadProtocolInfo(download.Protocols, "simple") + if err != nil { + return nil, err + } + // Try to get the file via WebDAV first - if client, values, err := net.NewWebDAVClientWithOpaque(download.DownloadEndpoint, download.Opaque); err == nil { + if client, values, err := net.NewWebDAVClientWithOpaque(p.DownloadEndpoint, p.Opaque); err == nil { data, err := client.Read(values[net.WebDAVPathName]) if err != nil { - return nil, fmt.Errorf("error while reading from '%v' via WebDAV: %v", download.DownloadEndpoint, err) + return nil, fmt.Errorf("error while reading from '%v' via WebDAV: %v", p.DownloadEndpoint, err) } return data, nil } // WebDAV is not supported, so directly read the HTTP endpoint - request, err := action.session.NewHTTPRequest(download.DownloadEndpoint, "GET", download.Token, nil) + request, err := action.session.NewHTTPRequest(p.DownloadEndpoint, "GET", p.Token, nil) if err != nil { - return nil, fmt.Errorf("unable to create an HTTP request for '%v': %v", download.DownloadEndpoint, err) + return nil, fmt.Errorf("unable to create an HTTP request for '%v': %v", p.DownloadEndpoint, err) } data, err := request.Do(true) if err != nil { - return nil, fmt.Errorf("error while reading from '%v' via HTTP: %v", download.DownloadEndpoint, err) + return nil, fmt.Errorf("error while reading from '%v' via HTTP: %v", p.DownloadEndpoint, err) } return data, nil } @@ -98,6 +103,15 @@ func (action *DownloadAction) initiateDownload(fileInfo *storage.ResourceInfo) ( return res, nil } +func getDownloadProtocolInfo(protocolInfos []*gateway.FileDownloadProtocol, protocol string) (*gateway.FileDownloadProtocol, error) { + for _, p := range protocolInfos { + if p.Protocol == protocol { + return p, nil + } + } + return nil, errtypes.NotFound(protocol) +} + // NewDownloadAction creates a new download action. func NewDownloadAction(session *sdk.Session) (*DownloadAction, error) { action := &DownloadAction{} diff --git a/pkg/sdk/action/upload.go b/pkg/sdk/action/upload.go index 7e517cc93c..c89f86b5d5 100644 --- a/pkg/sdk/action/upload.go +++ b/pkg/sdk/action/upload.go @@ -32,6 +32,7 @@ import ( storage "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" types "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" + "github.com/cs3org/reva/pkg/errtypes" "github.com/cs3org/reva/pkg/sdk" "github.com/cs3org/reva/pkg/sdk/common" "github.com/cs3org/reva/pkg/sdk/common/crypto" @@ -86,14 +87,19 @@ func (action *UploadAction) upload(data io.Reader, dataInfo os.FileInfo, target return nil, err } + simpleProtocol, err := getUploadProtocolInfo(upload.Protocols, "simple") + if err != nil { + return nil, err + } + // Try to upload the file via WebDAV first - if client, values, err := net.NewWebDAVClientWithOpaque(upload.UploadEndpoint, upload.Opaque); err == nil { + if client, values, err := net.NewWebDAVClientWithOpaque(simpleProtocol.UploadEndpoint, simpleProtocol.Opaque); err == nil { if err := client.Write(values[net.WebDAVPathName], data, dataInfo.Size()); err != nil { - return nil, fmt.Errorf("error while writing to '%v' via WebDAV: %v", upload.UploadEndpoint, err) + return nil, fmt.Errorf("error while writing to '%v' via WebDAV: %v", simpleProtocol.UploadEndpoint, err) } } else { // WebDAV is not supported, so directly write to the HTTP endpoint - checksumType := action.selectChecksumType(upload.AvailableChecksums) + checksumType := action.selectChecksumType(simpleProtocol.AvailableChecksums) checksumTypeName := crypto.GetChecksumTypeName(checksumType) checksum, err := crypto.ComputeChecksum(checksumType, data) if err != nil { @@ -106,13 +112,15 @@ func (action *UploadAction) upload(data io.Reader, dataInfo os.FileInfo, target } if action.EnableTUS { - if err := action.uploadFileTUS(upload, target, data, dataInfo, checksum, checksumTypeName); err != nil { - return nil, fmt.Errorf("error while writing to '%v' via TUS: %v", upload.UploadEndpoint, err) + tusProtocol, err := getUploadProtocolInfo(upload.Protocols, "tus") + if err != nil { + return nil, err } - } else { - if err := action.uploadFilePUT(upload, data, checksum, checksumTypeName); err != nil { - return nil, fmt.Errorf("error while writing to '%v' via HTTP: %v", upload.UploadEndpoint, err) + if err := action.uploadFileTUS(tusProtocol, target, data, dataInfo, checksum, checksumTypeName); err != nil { + return nil, fmt.Errorf("error while writing to '%v' via TUS: %v", tusProtocol.UploadEndpoint, err) } + } else if err := action.uploadFilePUT(simpleProtocol, data, checksum, checksumTypeName); err != nil { + return nil, fmt.Errorf("error while writing to '%v' via HTTP: %v", simpleProtocol.UploadEndpoint, err) } } @@ -145,6 +153,15 @@ func (action *UploadAction) initiateUpload(target string, size int64) (*gateway. return res, nil } +func getUploadProtocolInfo(protocolInfos []*gateway.FileUploadProtocol, protocol string) (*gateway.FileUploadProtocol, error) { + for _, p := range protocolInfos { + if p.Protocol == protocol { + return p, nil + } + } + return nil, errtypes.NotFound(protocol) +} + func (action *UploadAction) selectChecksumType(checksumTypes []*provider.ResourceChecksumPriority) provider.ResourceChecksumType { var selChecksumType provider.ResourceChecksumType var maxPrio uint32 = math.MaxUint32 @@ -157,7 +174,7 @@ func (action *UploadAction) selectChecksumType(checksumTypes []*provider.Resourc return selChecksumType } -func (action *UploadAction) uploadFilePUT(upload *gateway.InitiateFileUploadResponse, data io.Reader, checksum string, checksumType string) error { +func (action *UploadAction) uploadFilePUT(upload *gateway.FileUploadProtocol, data io.Reader, checksum string, checksumType string) error { request, err := action.session.NewHTTPRequest(upload.UploadEndpoint, "PUT", upload.Token, data) if err != nil { return fmt.Errorf("unable to create HTTP request for '%v': %v", upload.UploadEndpoint, err) @@ -172,7 +189,7 @@ func (action *UploadAction) uploadFilePUT(upload *gateway.InitiateFileUploadResp return err } -func (action *UploadAction) uploadFileTUS(upload *gateway.InitiateFileUploadResponse, target string, data io.Reader, fileInfo os.FileInfo, checksum string, checksumType string) error { +func (action *UploadAction) uploadFileTUS(upload *gateway.FileUploadProtocol, target string, data io.Reader, fileInfo os.FileInfo, checksum string, checksumType string) error { tusClient, err := net.NewTUSClient(upload.UploadEndpoint, action.session.Token(), upload.Token) if err != nil { return fmt.Errorf("unable to create TUS client: %v", err) diff --git a/pkg/sdk/sdk_test.go b/pkg/sdk/sdk_test.go deleted file mode 100644 index b2c5a1c3cc..0000000000 --- a/pkg/sdk/sdk_test.go +++ /dev/null @@ -1,74 +0,0 @@ -// Copyright 2018-2020 CERN -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -// In applying this license, CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -package sdk_test - -import ( - "fmt" - "testing" - - "github.com/cs3org/reva/pkg/sdk" - testintl "github.com/cs3org/reva/pkg/sdk/common/testing" -) - -func TestSession(t *testing.T) { - tests := []struct { - host string - username string - password string - shouldList bool - shouldLogin bool - }{ - {"sciencemesh-test.uni-muenster.de:9600", "test", "testpass", true, true}, - {"sciencemesh.cernbox.cern.ch:443", "invalid", "invalid", true, false}, - {"google.de:443", "invalid", "invalid", false, false}, - } - - for _, test := range tests { - t.Run(test.host, func(t *testing.T) { - if session, err := sdk.NewSession(); err == nil { - if err := session.Initiate(test.host, false); err == nil { - if _, err := session.GetLoginMethods(); err != nil && test.shouldList { - t.Errorf(testintl.FormatTestError("Session.GetLoginMethods", err)) - } else if err == nil && !test.shouldList { - t.Errorf(testintl.FormatTestError("Session.GetLoginMethods", fmt.Errorf("listing of login methods with an invalid host succeeded"))) - } - - if err := session.BasicLogin(test.username, test.password); err == nil { - if test.shouldLogin { - if !session.IsValid() { - t.Errorf(testintl.FormatTestError("Session.BasicLogin", fmt.Errorf("logged in, but session is invalid"), test.username, test.password)) - } - if session.Token() == "" { - t.Errorf(testintl.FormatTestError("Session.BasicLogin", fmt.Errorf("logged in, but received no token"), test.username, test.password)) - } - } else { - t.Errorf(testintl.FormatTestError("Session.BasicLogin", fmt.Errorf("logging in with invalid credentials succeeded"), test.username, test.password)) - } - } else if test.shouldLogin { - t.Errorf(testintl.FormatTestError("Session.BasicLogin", err, test.username, test.password)) - } - } else { - t.Errorf(testintl.FormatTestError("Session.Initiate", err, test.host, false)) - } - } else { - t.Errorf(testintl.FormatTestError("NewSession", err)) - } - }) - } -} diff --git a/pkg/storage/fs/eosgrpc/eosgrpc.go b/pkg/storage/fs/eosgrpc/eosgrpc.go index ea754805c3..730be34f0d 100644 --- a/pkg/storage/fs/eosgrpc/eosgrpc.go +++ b/pkg/storage/fs/eosgrpc/eosgrpc.go @@ -237,9 +237,9 @@ func New(m map[string]interface{}) (storage.FS, error) { return eosfs, nil } -// InitiateUpload returns an upload id that can be used for uploads with tus -func (fs *eosfs) InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64, metadata map[string]string) (uploadID string, err error) { - return "", errtypes.NotSupported("op not supported") +// InitiateUpload returns upload ids corresponding to different protocols it supports +func (fs *eosfs) InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64, metadata map[string]string) (map[string]string, error) { + return nil, errtypes.NotSupported("op not supported") } func (fs *eosfs) Shutdown(ctx context.Context) error { diff --git a/pkg/storage/fs/ocis/upload.go b/pkg/storage/fs/ocis/upload.go index 06830b5edf..a41ea51cdd 100644 --- a/pkg/storage/fs/ocis/upload.go +++ b/pkg/storage/fs/ocis/upload.go @@ -49,11 +49,11 @@ func (fs *ocisfs) Upload(ctx context.Context, ref *provider.Reference, r io.Read // Set the length to 0 and set SizeIsDeferred to true metadata := map[string]string{"sizedeferred": "true"} - uploadID, err := fs.InitiateUpload(ctx, ref, 0, metadata) + uploadIDs, err := fs.InitiateUpload(ctx, ref, 0, metadata) if err != nil { return err } - if upload, err = fs.GetUpload(ctx, uploadID); err != nil { + if upload, err = fs.GetUpload(ctx, uploadIDs["simple"]); err != nil { return errors.Wrap(err, "ocisfs: error retrieving upload") } } @@ -94,9 +94,9 @@ func (fs *ocisfs) Upload(ctx context.Context, ref *provider.Reference, r io.Read return uploadInfo.FinishUpload(ctx) } -// InitiateUpload returns an upload id that can be used for uploads with tus +// InitiateUpload returns upload ids corresponding to different protocols it supports // TODO read optional content for small files in this request -func (fs *ocisfs) InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64, metadata map[string]string) (uploadID string, err error) { +func (fs *ocisfs) InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64, metadata map[string]string) (map[string]string, error) { log := appctx.GetLogger(ctx) @@ -104,14 +104,14 @@ func (fs *ocisfs) InitiateUpload(ctx context.Context, ref *provider.Reference, u n, err := fs.lu.NodeFromResource(ctx, ref) if err != nil { - return "", err + return nil, err } // permissions are checked in NewUpload below relative, err = fs.lu.Path(ctx, n) if err != nil { - return "", err + return nil, err } info := tusd.FileInfo{ @@ -135,12 +135,15 @@ func (fs *ocisfs) InitiateUpload(ctx context.Context, ref *provider.Reference, u upload, err := fs.NewUpload(ctx, info) if err != nil { - return "", err + return nil, err } info, _ = upload.GetInfo(ctx) - return info.ID, nil + return map[string]string{ + "simple": info.ID, + "tus": info.ID, + }, nil } // UseIn tells the tus upload middleware which extensions it supports. diff --git a/pkg/storage/fs/owncloud/upload.go b/pkg/storage/fs/owncloud/upload.go index a8e89f9442..ea85322721 100644 --- a/pkg/storage/fs/owncloud/upload.go +++ b/pkg/storage/fs/owncloud/upload.go @@ -49,11 +49,11 @@ func (fs *ocfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCl // Set the length to 0 and set SizeIsDeferred to true metadata := map[string]string{"sizedeferred": "true"} - uploadID, err := fs.InitiateUpload(ctx, ref, 0, metadata) + uploadIDs, err := fs.InitiateUpload(ctx, ref, 0, metadata) if err != nil { return err } - if upload, err = fs.GetUpload(ctx, uploadID); err != nil { + if upload, err = fs.GetUpload(ctx, uploadIDs["simple"]); err != nil { return errors.Wrap(err, "ocfs: error retrieving upload") } } @@ -94,12 +94,12 @@ func (fs *ocfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCl return uploadInfo.FinishUpload(ctx) } -// InitiateUpload returns an upload id that can be used for uploads with tus +// InitiateUpload returns upload ids corresponding to different protocols it supports // TODO read optional content for small files in this request -func (fs *ocfs) InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64, metadata map[string]string) (uploadID string, err error) { +func (fs *ocfs) InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64, metadata map[string]string) (map[string]string, error) { ip, err := fs.resolve(ctx, ref) if err != nil { - return "", errors.Wrap(err, "ocfs: error resolving reference") + return nil, errors.Wrap(err, "ocfs: error resolving reference") } // permissions are checked in NewUpload below @@ -125,12 +125,15 @@ func (fs *ocfs) InitiateUpload(ctx context.Context, ref *provider.Reference, upl upload, err := fs.NewUpload(ctx, info) if err != nil { - return "", err + return nil, err } info, _ = upload.GetInfo(ctx) - return info.ID, nil + return map[string]string{ + "simple": info.ID, + "tus": info.ID, + }, nil } // UseIn tells the tus upload middleware which extensions it supports. diff --git a/pkg/storage/fs/s3/upload.go b/pkg/storage/fs/s3/upload.go index 2feb44702e..52c68186c2 100644 --- a/pkg/storage/fs/s3/upload.go +++ b/pkg/storage/fs/s3/upload.go @@ -25,7 +25,7 @@ import ( "github.com/cs3org/reva/pkg/errtypes" ) -// InitiateUpload returns an upload id that can be used for uploads with tus -func (fs *s3FS) InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64, metadata map[string]string) (uploadID string, err error) { - return "", errtypes.NotSupported("op not supported") +// InitiateUpload returns upload ids corresponding to different protocols it supports +func (fs *s3FS) InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64, metadata map[string]string) (map[string]string, error) { + return nil, errtypes.NotSupported("op not supported") } diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 98063df41b..29bff95664 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -36,7 +36,7 @@ type FS interface { Move(ctx context.Context, oldRef, newRef *provider.Reference) error GetMD(ctx context.Context, ref *provider.Reference, mdKeys []string) (*provider.ResourceInfo, error) ListFolder(ctx context.Context, ref *provider.Reference, mdKeys []string) ([]*provider.ResourceInfo, error) - InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64, metadata map[string]string) (string, error) + InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64, metadata map[string]string) (map[string]string, error) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser) error Download(ctx context.Context, ref *provider.Reference) (io.ReadCloser, error) ListRevisions(ctx context.Context, ref *provider.Reference) ([]*provider.FileVersion, error) diff --git a/pkg/storage/utils/eosfs/upload.go b/pkg/storage/utils/eosfs/upload.go index e60c539680..999eb3d33f 100644 --- a/pkg/storage/utils/eosfs/upload.go +++ b/pkg/storage/utils/eosfs/upload.go @@ -74,6 +74,8 @@ func (fs *eosfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadC return fs.c.Write(ctx, uid, gid, fn, r) } -func (fs *eosfs) InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64, metadata map[string]string) (string, error) { - return ref.GetPath(), nil +func (fs *eosfs) InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64, metadata map[string]string) (map[string]string, error) { + return map[string]string{ + "simple": ref.GetPath(), + }, nil } diff --git a/pkg/storage/utils/localfs/upload.go b/pkg/storage/utils/localfs/upload.go index f749cb85aa..5699d95a45 100644 --- a/pkg/storage/utils/localfs/upload.go +++ b/pkg/storage/utils/localfs/upload.go @@ -47,11 +47,11 @@ func (fs *localfs) Upload(ctx context.Context, ref *provider.Reference, r io.Rea // Set the length to 0 and set SizeIsDeferred to true metadata := map[string]string{"sizedeferred": "true"} - uploadID, err := fs.InitiateUpload(ctx, ref, 0, metadata) + uploadIDs, err := fs.InitiateUpload(ctx, ref, 0, metadata) if err != nil { return err } - if upload, err = fs.GetUpload(ctx, uploadID); err != nil { + if upload, err = fs.GetUpload(ctx, uploadIDs["simple"]); err != nil { return errors.Wrap(err, "localfs: error retrieving upload") } } @@ -92,16 +92,16 @@ func (fs *localfs) Upload(ctx context.Context, ref *provider.Reference, r io.Rea return uploadInfo.FinishUpload(ctx) } -// InitiateUpload returns an upload id that can be used for uploads with tus +// InitiateUpload returns upload ids corresponding to different protocols it supports // It resolves the resource and then reuses the NewUpload function // Currently requires the uploadLength to be set // TODO to implement LengthDeferrerDataStore make size optional // TODO read optional content for small files in this request -func (fs *localfs) InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64, metadata map[string]string) (uploadID string, err error) { +func (fs *localfs) InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64, metadata map[string]string) (map[string]string, error) { np, err := fs.resolve(ctx, ref) if err != nil { - return "", errors.Wrap(err, "localfs: error resolving reference") + return nil, errors.Wrap(err, "localfs: error resolving reference") } info := tusd.FileInfo{ @@ -123,12 +123,15 @@ func (fs *localfs) InitiateUpload(ctx context.Context, ref *provider.Reference, upload, err := fs.NewUpload(ctx, info) if err != nil { - return "", err + return nil, err } info, _ = upload.GetInfo(ctx) - return info.ID, nil + return map[string]string{ + "simple": info.ID, + "tus": info.ID, + }, nil } // UseIn tells the tus upload middleware which extensions it supports.