Skip to content

Commit

Permalink
Data transfers package (#1321)
Browse files Browse the repository at this point in the history
  • Loading branch information
ishank011 authored Nov 20, 2020
1 parent f5afafc commit b875947
Show file tree
Hide file tree
Showing 31 changed files with 772 additions and 559 deletions.
11 changes: 11 additions & 0 deletions changelog/unreleased/http-datatx.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
Enhancement: Add support for multiple data transfer protocols

Previously, we had to configure which data transfer protocol to use in the
dataprovider service. A previous PR added the functionality to redirect requests
to different handlers based on the request method but that would lead to
conflicts if multiple protocols don't support mutually exclusive sets of
requests. This PR adds the functionality to have multiple such handlers
simultaneously and the client can choose which protocol to use.

https://github.com/cs3org/reva/pull/1321
https://github.com/cs3org/reva/pull/1285/
39 changes: 29 additions & 10 deletions cmd/reva/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import (
"time"

"github.com/cheggaaa/pb"
gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
typespb "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
"github.com/cs3org/reva/internal/http/services/datagateway"
"github.com/cs3org/reva/pkg/errtypes"
"github.com/cs3org/reva/pkg/rhttp"
Expand Down Expand Up @@ -86,23 +86,28 @@ func downloadCommand() *command {
return formatError(res.Status)
}

p, err := getDownloadProtocolInfo(res.Protocols, "simple")
if err != nil {
return err
}

// TODO(labkode): upload to data server
fmt.Printf("Downloading from: %s\n", res.DownloadEndpoint)
fmt.Printf("Downloading from: %s\n", p.DownloadEndpoint)

content, err := checkDownloadWebdavRef(res.DownloadEndpoint, res.Opaque)
content, err := checkDownloadWebdavRef(res.Protocols)
if err != nil {
if _, ok := err.(errtypes.IsNotSupported); !ok {
return err
}

dataServerURL := res.DownloadEndpoint
dataServerURL := p.DownloadEndpoint
// TODO(labkode): do a protocol switch
httpReq, err := rhttp.NewRequest(ctx, "GET", dataServerURL, nil)
if err != nil {
return err
}

httpReq.Header.Set(datagateway.TokenTransportHeader, res.Token)
httpReq.Header.Set(datagateway.TokenTransportHeader, p.Token)
httpClient := rhttp.GetHTTPClient(
rhttp.Context(ctx),
// TODO make insecure configurable
Expand Down Expand Up @@ -145,13 +150,27 @@ func downloadCommand() *command {
return cmd
}

func checkDownloadWebdavRef(endpoint string, opaque *typespb.Opaque) (io.Reader, error) {
if opaque == nil {
func getDownloadProtocolInfo(protocolInfos []*gateway.FileDownloadProtocol, protocol string) (*gateway.FileDownloadProtocol, error) {
for _, p := range protocolInfos {
if p.Protocol == protocol {
return p, nil
}
}
return nil, errtypes.NotFound(protocol)
}

func checkDownloadWebdavRef(protocols []*gateway.FileDownloadProtocol) (io.Reader, error) {
p, err := getDownloadProtocolInfo(protocols, "simple")
if err != nil {
return nil, err
}

if p.Opaque == nil {
return nil, errtypes.NotSupported("opaque object not defined")
}

var token string
tokenOpaque, ok := opaque.Map["webdav-token"]
tokenOpaque, ok := p.Opaque.Map["webdav-token"]
if !ok {
return nil, errtypes.NotSupported("webdav token not defined")
}
Expand All @@ -163,7 +182,7 @@ func checkDownloadWebdavRef(endpoint string, opaque *typespb.Opaque) (io.Reader,
}

var filePath string
fileOpaque, ok := opaque.Map["webdav-file-path"]
fileOpaque, ok := p.Opaque.Map["webdav-file-path"]
if !ok {
return nil, errtypes.NotSupported("webdav file path not defined")
}
Expand All @@ -174,7 +193,7 @@ func checkDownloadWebdavRef(endpoint string, opaque *typespb.Opaque) (io.Reader,
return nil, errors.New("opaque entry decoder not recognized: " + fileOpaque.Decoder)
}

c := gowebdav.NewClient(endpoint, "", "")
c := gowebdav.NewClient(p.DownloadEndpoint, "", "")
c.SetHeader(tokenpkg.TokenHeader, token)

reader, err := c.ReadStream(filePath)
Expand Down
58 changes: 37 additions & 21 deletions cmd/reva/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cs3org/reva/internal/http/services/datagateway"
"github.com/pkg/errors"

gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
typespb "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
Expand All @@ -50,11 +51,11 @@ func uploadCommand() *command {
cmd := newCommand("upload")
cmd.Description = func() string { return "upload a local file to the remote server" }
cmd.Usage = func() string { return "Usage: upload [-flags] <file_name> <remote_target>" }
disableTusFlag := cmd.Bool("disable-tus", false, "whether to disable tus protocol")
protocolFlag := cmd.String("protocol", "tus", "the protocol to be used for uploads")
xsFlag := cmd.String("xs", "negotiate", "compute checksum")

cmd.ResetFlags = func() {
*disableTusFlag, *xsFlag = false, "negotiate"
*protocolFlag, *xsFlag = "tus", "negotiate"
}

cmd.Action = func(w ...io.Writer) error {
Expand Down Expand Up @@ -115,19 +116,23 @@ func uploadCommand() *command {
return formatError(res.Status)
}

// TODO(labkode): upload to data server
fmt.Printf("Data server: %s\n", res.UploadEndpoint)
fmt.Printf("Allowed checksums: %+v\n", res.AvailableChecksums)

if err = checkUploadWebdavRef(res.UploadEndpoint, res.Opaque, md, fd); err != nil {
if err = checkUploadWebdavRef(res.Protocols, md, fd); err != nil {
if _, ok := err.(errtypes.IsNotSupported); !ok {
return err
}
} else {
return nil
}

xsType, err := guessXS(*xsFlag, res.AvailableChecksums)
p, err := getUploadProtocolInfo(res.Protocols, *protocolFlag)
if err != nil {
return err
}

fmt.Printf("Data server: %s\n", p.UploadEndpoint)
fmt.Printf("Allowed checksums: %+v\n", p.AvailableChecksums)

xsType, err := guessXS(*xsFlag, p.AvailableChecksums)
if err != nil {
return err
}
Expand All @@ -144,15 +149,15 @@ func uploadCommand() *command {
return err
}

dataServerURL := res.UploadEndpoint
dataServerURL := p.UploadEndpoint

if *disableTusFlag {
if *protocolFlag == "simple" {
httpReq, err := rhttp.NewRequest(ctx, "PUT", dataServerURL, fd)
if err != nil {
return err
}

httpReq.Header.Set(datagateway.TokenTransportHeader, res.Token)
httpReq.Header.Set(datagateway.TokenTransportHeader, p.Token)
q := httpReq.URL.Query()
q.Add("xs", xs)
q.Add("xs_type", storageprovider.GRPC2PKGXS(xsType).String())
Expand All @@ -178,9 +183,7 @@ func uploadCommand() *command {
if token, ok := tokenpkg.ContextGetToken(ctx); ok {
c.Header.Add(tokenpkg.TokenHeader, token)
}
if res.Token != "" {
c.Header.Add(datagateway.TokenTransportHeader, res.Token)
}
c.Header.Add(datagateway.TokenTransportHeader, p.Token)
tusc, err := tus.NewClient(dataServerURL, c)
if err != nil {
return err
Expand Down Expand Up @@ -233,13 +236,27 @@ func uploadCommand() *command {
return cmd
}

func checkUploadWebdavRef(endpoint string, opaque *typespb.Opaque, md os.FileInfo, fd *os.File) error {
if opaque == nil {
func getUploadProtocolInfo(protocolInfos []*gateway.FileUploadProtocol, protocol string) (*gateway.FileUploadProtocol, error) {
for _, p := range protocolInfos {
if p.Protocol == protocol {
return p, nil
}
}
return nil, errtypes.NotFound(protocol)
}

func checkUploadWebdavRef(protocols []*gateway.FileUploadProtocol, md os.FileInfo, fd *os.File) error {
p, err := getUploadProtocolInfo(protocols, "simple")
if err != nil {
return err
}

if p.Opaque == nil {
return errtypes.NotSupported("opaque object not defined")
}

var token string
tokenOpaque, ok := opaque.Map["webdav-token"]
tokenOpaque, ok := p.Opaque.Map["webdav-token"]
if !ok {
return errtypes.NotSupported("webdav token not defined")
}
Expand All @@ -251,7 +268,7 @@ func checkUploadWebdavRef(endpoint string, opaque *typespb.Opaque, md os.FileInf
}

var filePath string
fileOpaque, ok := opaque.Map["webdav-file-path"]
fileOpaque, ok := p.Opaque.Map["webdav-file-path"]
if !ok {
return errtypes.NotSupported("webdav file path not defined")
}
Expand All @@ -262,12 +279,11 @@ func checkUploadWebdavRef(endpoint string, opaque *typespb.Opaque, md os.FileInf
return errors.New("opaque entry decoder not recognized: " + fileOpaque.Decoder)
}

c := gowebdav.NewClient(endpoint, "", "")
c := gowebdav.NewClient(p.UploadEndpoint, "", "")
c.SetHeader(tokenpkg.TokenHeader, token)
c.SetHeader("Upload-Length", strconv.FormatInt(md.Size(), 10))

err := c.WriteStream(filePath, fd, 0700)
if err != nil {
if err = c.WriteStream(filePath, fd, 0700); err != nil {
return err
}

Expand Down
1 change: 1 addition & 0 deletions cmd/revad/runtime/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
_ "github.com/cs3org/reva/pkg/ocm/provider/authorizer/loader"
_ "github.com/cs3org/reva/pkg/ocm/share/manager/loader"
_ "github.com/cs3org/reva/pkg/publicshare/manager/loader"
_ "github.com/cs3org/reva/pkg/rhttp/datatx/manager/loader"
_ "github.com/cs3org/reva/pkg/share/manager/loader"
_ "github.com/cs3org/reva/pkg/storage/fs/loader"
_ "github.com/cs3org/reva/pkg/storage/registry/loader"
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/cheggaaa/pb v1.0.29
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e
github.com/cs3org/go-cs3apis v0.0.0-20201007120910-416ed6cf8b00
github.com/cs3org/go-cs3apis v0.0.0-20201118090759-87929f5bae21
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/eventials/go-tus v0.0.0-20200718001131-45c7ec8f5d59
github.com/go-ldap/ldap/v3 v3.2.4
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e h1:tqSPWQeueWTKnJVMJff
github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e/go.mod h1:XJEZ3/EQuI3BXTp/6DUzFr850vlxq11I6satRtz0YQ4=
github.com/cs3org/go-cs3apis v0.0.0-20201007120910-416ed6cf8b00 h1:LVl25JaflluOchVvaHWtoCynm5OaM+VNai0IYkcCSe0=
github.com/cs3org/go-cs3apis v0.0.0-20201007120910-416ed6cf8b00/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY=
github.com/cs3org/go-cs3apis v0.0.0-20201118090759-87929f5bae21 h1:mZpylrgnCgSeaZ5EznvHIPIKuaQHMHZDi2wkJtk4M8Y=
github.com/cs3org/go-cs3apis v0.0.0-20201118090759-87929f5bae21/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
Loading

0 comments on commit b875947

Please sign in to comment.