Skip to content

Commit

Permalink
TUS upload support through datagateway (#878)
Browse files Browse the repository at this point in the history
  • Loading branch information
kulmann authored Jun 25, 2020
1 parent 012d3c2 commit 743296a
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 14 deletions.
4 changes: 3 additions & 1 deletion cmd/reva/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"net/http"
"os"

"github.com/cs3org/reva/internal/http/services/datagateway"

"github.com/cheggaaa/pb"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
Expand Down Expand Up @@ -89,7 +91,7 @@ func downloadCommand() *command {
return err
}

httpReq.Header.Set("X-Reva-Transfer", res.Token)
httpReq.Header.Set(datagateway.TokenTransportHeader, res.Token)
httpClient := rhttp.GetHTTPClient(ctx)

httpRes, err := httpClient.Do(httpReq)
Expand Down
4 changes: 3 additions & 1 deletion cmd/reva/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"path/filepath"
"strconv"

"github.com/cs3org/reva/internal/http/services/datagateway"

"github.com/cheggaaa/pb"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
Expand Down Expand Up @@ -134,7 +136,7 @@ func uploadCommand() *command {
}
if res.Token != "" {
fmt.Printf("using X-Reva-Transfer header\n")
c.Header.Add("X-Reva-Transfer", res.Token)
c.Header.Add(datagateway.TokenTransportHeader, res.Token)
} else if token, ok := tokenpkg.ContextGetToken(ctx); ok {
fmt.Printf("using %s header\n", tokenpkg.TokenHeader)
c.Header.Add(tokenpkg.TokenHeader, token)
Expand Down
3 changes: 0 additions & 3 deletions internal/grpc/services/storageprovider/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,6 @@ func (c *config) init() {

c.DataServerURL = sharedconf.GetDataGateway(c.DataServerURL)

// TODO: Uploads currently don't work when ExposeDataServer is false
c.ExposeDataServer = true

// set sane defaults
if len(c.AvailableXS) == 0 {
c.AvailableXS = map[string]uint32{"md5": 100, "unset": 1000}
Expand Down
92 changes: 84 additions & 8 deletions internal/http/services/datagateway/datagateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"io"
"net/http"
"net/url"
"path"

"github.com/cs3org/reva/pkg/appctx"
"github.com/cs3org/reva/pkg/errtypes"
Expand All @@ -36,7 +37,8 @@ import (
)

const (
tokenTransportHeader = "X-Reva-Transfer"
// TokenTransportHeader holds the header key for the reva transfer token
TokenTransportHeader = "X-Reva-Transfer"
)

func init() {
Expand Down Expand Up @@ -110,6 +112,9 @@ func (s *svc) setHandler() {
case "PUT":
s.doPut(w, r)
return
case "PATCH":
s.doPatch(w, r)
return
default:
w.WriteHeader(http.StatusNotImplemented)
return
Expand All @@ -124,7 +129,14 @@ func addCorsHeader(res http.ResponseWriter) {
headers.Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS, HEAD")
}

func (s *svc) verify(ctx context.Context, token string) (*transferClaims, error) {
func (s *svc) verify(ctx context.Context, r *http.Request) (*transferClaims, error) {
// Extract transfer token from request header. If not existing, assume that it's the last path segment instead.
token := r.Header.Get(TokenTransportHeader)
if token == "" {
token = path.Base(r.URL.Path)
r.Header.Set(TokenTransportHeader, token)
}

j, err := jwt.ParseWithClaims(token, &transferClaims{}, func(token *jwt.Token) (interface{}, error) {
return []byte(s.conf.TransferSharedSecret), nil
})
Expand All @@ -145,11 +157,10 @@ func (s *svc) doGet(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
log := appctx.GetLogger(ctx)

token := r.Header.Get(tokenTransportHeader)
claims, err := s.verify(ctx, token)
claims, err := s.verify(ctx, r)
if err != nil {
err = errors.Wrap(err, "datagateway: error validating transfer token")
log.Err(err)
log.Err(err).Str("token", r.Header.Get(TokenTransportHeader)).Msg("invalid transfer token")
w.WriteHeader(http.StatusForbidden)
return
}
Expand Down Expand Up @@ -188,11 +199,10 @@ func (s *svc) doPut(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
log := appctx.GetLogger(ctx)

token := r.Header.Get(tokenTransportHeader)
claims, err := s.verify(ctx, token)
claims, err := s.verify(ctx, r)
if err != nil {
err = errors.Wrap(err, "datagateway: error validating transfer token")
log.Err(err).Str("token", token).Msg("invalid token")
log.Err(err).Str("token", r.Header.Get(TokenTransportHeader)).Msg("invalid transfer token")
w.WriteHeader(http.StatusForbidden)
return
}
Expand Down Expand Up @@ -238,3 +248,69 @@ func (s *svc) doPut(w http.ResponseWriter, r *http.Request) {
log.Err(err).Msg("error writing body after header were set")
}
}

// TODO: put and post code is pretty much the same. Should be solved in a nicer way in the long run.
func (s *svc) doPatch(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
log := appctx.GetLogger(ctx)

claims, err := s.verify(ctx, r)
if err != nil {
err = errors.Wrap(err, "datagateway: error validating transfer token")
log.Err(err).Str("token", r.Header.Get(TokenTransportHeader)).Msg("invalid transfer token")
w.WriteHeader(http.StatusForbidden)
return
}

target := claims.Target
// add query params to target, clients can send checksums and other information.
targetURL, err := url.Parse(target)
if err != nil {
log.Err(err).Msg("datagateway: error parsing target url")
w.WriteHeader(http.StatusInternalServerError)
return
}

targetURL.RawQuery = r.URL.RawQuery
target = targetURL.String()

log.Info().Str("target", claims.Target).Msg("sending request to internal data server")

httpClient := rhttp.GetHTTPClient(ctx)
httpReq, err := rhttp.NewRequest(ctx, "PATCH", target, r.Body)
if err != nil {
log.Err(err).Msg("wrong request")
w.WriteHeader(http.StatusInternalServerError)
return
}
httpReq.Header = r.Header

httpRes, err := httpClient.Do(httpReq)
if err != nil {
log.Err(err).Msg("error doing PATCH request to data service")
w.WriteHeader(http.StatusInternalServerError)
return
}

copyHeader(w.Header(), httpRes.Header)

if httpRes.StatusCode != http.StatusOK {
w.WriteHeader(httpRes.StatusCode)
return
}

defer httpRes.Body.Close()
w.WriteHeader(http.StatusOK)
_, err = io.Copy(w, httpRes.Body)
if err != nil {
log.Err(err).Msg("error writing body after header were set")
}
}

func copyHeader(dst, src http.Header) {
for key, values := range src {
for i := range values {
dst.Add(key, values[i])
}
}
}
4 changes: 3 additions & 1 deletion internal/http/services/owncloud/ocdav/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"strconv"
"time"

"github.com/cs3org/reva/internal/http/services/datagateway"

rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/internal/http/utils"
Expand Down Expand Up @@ -103,7 +105,7 @@ func (s *svc) handleGet(w http.ResponseWriter, r *http.Request, ns string) {
w.WriteHeader(http.StatusInternalServerError)
return
}
httpReq.Header.Set("X-Reva-Transfer", dRes.Token)
httpReq.Header.Set(datagateway.TokenTransportHeader, dRes.Token)
httpClient := rhttp.GetHTTPClient(ctx)

httpRes, err := httpClient.Do(httpReq)
Expand Down
2 changes: 2 additions & 0 deletions internal/http/services/owncloud/ocdav/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
typespb "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
"github.com/cs3org/reva/internal/http/services/datagateway"
"github.com/cs3org/reva/internal/http/utils"
"github.com/cs3org/reva/pkg/appctx"
"github.com/cs3org/reva/pkg/rhttp"
Expand Down Expand Up @@ -264,6 +265,7 @@ func (s *svc) handlePut(w http.ResponseWriter, r *http.Request, ns string) {
Str("token", tokenpkg.ContextMustGetToken(ctx)).
Msg("adding token to header")
c.Header.Set(tokenpkg.TokenHeader, tokenpkg.ContextMustGetToken(ctx))
c.Header.Set(datagateway.TokenTransportHeader, uRes.Token)

tusc, err := tus.NewClient(dataServerURL, c)
if err != nil {
Expand Down
10 changes: 10 additions & 0 deletions internal/http/services/owncloud/ocdav/tus.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package ocdav
import (
"net/http"
"path"
"strings"
"time"

rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
Expand Down Expand Up @@ -151,6 +152,15 @@ func (s *svc) handleTusPost(w http.ResponseWriter, r *http.Request, ns string) {
return
}

// TUS clients don't understand the reva transfer token. We need to append it to the upload endpoint.
// The DataGateway has to take care of pulling it back into the request header upon request arrival.
if uRes.Token != "" {
if !strings.HasSuffix(uRes.UploadEndpoint, "/") {
uRes.UploadEndpoint += "/"
}
uRes.UploadEndpoint += uRes.Token
}

w.Header().Set("Location", uRes.UploadEndpoint)

// for creation-with-upload extension forward bytes to dataprovider
Expand Down

0 comments on commit 743296a

Please sign in to comment.