Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TUS upload support through datagateway #878

Merged
merged 8 commits into from
Jun 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/reva/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"net/http"
"os"

"github.com/cs3org/reva/internal/http/services/datagateway"

"github.com/cheggaaa/pb"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
Expand Down Expand Up @@ -89,7 +91,7 @@ func downloadCommand() *command {
return err
}

httpReq.Header.Set("X-Reva-Transfer", res.Token)
httpReq.Header.Set(datagateway.TokenTransportHeader, res.Token)
httpClient := rhttp.GetHTTPClient(ctx)

httpRes, err := httpClient.Do(httpReq)
Expand Down
4 changes: 3 additions & 1 deletion cmd/reva/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"path/filepath"
"strconv"

"github.com/cs3org/reva/internal/http/services/datagateway"

"github.com/cheggaaa/pb"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
Expand Down Expand Up @@ -134,7 +136,7 @@ func uploadCommand() *command {
}
if res.Token != "" {
fmt.Printf("using X-Reva-Transfer header\n")
c.Header.Add("X-Reva-Transfer", res.Token)
c.Header.Add(datagateway.TokenTransportHeader, res.Token)
} else if token, ok := tokenpkg.ContextGetToken(ctx); ok {
fmt.Printf("using %s header\n", tokenpkg.TokenHeader)
c.Header.Add(tokenpkg.TokenHeader, token)
Expand Down
3 changes: 0 additions & 3 deletions internal/grpc/services/storageprovider/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,6 @@ func (c *config) init() {

c.DataServerURL = sharedconf.GetDataGateway(c.DataServerURL)

// TODO: Uploads currently don't work when ExposeDataServer is false
c.ExposeDataServer = true

// set sane defaults
if len(c.AvailableXS) == 0 {
c.AvailableXS = map[string]uint32{"md5": 100, "unset": 1000}
Expand Down
92 changes: 84 additions & 8 deletions internal/http/services/datagateway/datagateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"io"
"net/http"
"net/url"
"path"

"github.com/cs3org/reva/pkg/appctx"
"github.com/cs3org/reva/pkg/errtypes"
Expand All @@ -36,7 +37,8 @@ import (
)

const (
tokenTransportHeader = "X-Reva-Transfer"
// TokenTransportHeader holds the header key for the reva transfer token
TokenTransportHeader = "X-Reva-Transfer"
)

func init() {
Expand Down Expand Up @@ -110,6 +112,9 @@ func (s *svc) setHandler() {
case "PUT":
s.doPut(w, r)
return
case "PATCH":
s.doPatch(w, r)
return
default:
w.WriteHeader(http.StatusNotImplemented)
return
Expand All @@ -124,7 +129,14 @@ func addCorsHeader(res http.ResponseWriter) {
headers.Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS, HEAD")
}

func (s *svc) verify(ctx context.Context, token string) (*transferClaims, error) {
func (s *svc) verify(ctx context.Context, r *http.Request) (*transferClaims, error) {
// Extract transfer token from request header. If not existing, assume that it's the last path segment instead.
token := r.Header.Get(TokenTransportHeader)
if token == "" {
token = path.Base(r.URL.Path)
r.Header.Set(TokenTransportHeader, token)
}

j, err := jwt.ParseWithClaims(token, &transferClaims{}, func(token *jwt.Token) (interface{}, error) {
return []byte(s.conf.TransferSharedSecret), nil
})
Expand All @@ -145,11 +157,10 @@ func (s *svc) doGet(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
log := appctx.GetLogger(ctx)

token := r.Header.Get(tokenTransportHeader)
claims, err := s.verify(ctx, token)
claims, err := s.verify(ctx, r)
if err != nil {
err = errors.Wrap(err, "datagateway: error validating transfer token")
log.Err(err)
log.Err(err).Str("token", r.Header.Get(TokenTransportHeader)).Msg("invalid transfer token")
w.WriteHeader(http.StatusForbidden)
return
}
Expand Down Expand Up @@ -188,11 +199,10 @@ func (s *svc) doPut(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
log := appctx.GetLogger(ctx)

token := r.Header.Get(tokenTransportHeader)
claims, err := s.verify(ctx, token)
claims, err := s.verify(ctx, r)
if err != nil {
err = errors.Wrap(err, "datagateway: error validating transfer token")
log.Err(err).Str("token", token).Msg("invalid token")
log.Err(err).Str("token", r.Header.Get(TokenTransportHeader)).Msg("invalid transfer token")
w.WriteHeader(http.StatusForbidden)
return
}
Expand Down Expand Up @@ -238,3 +248,69 @@ func (s *svc) doPut(w http.ResponseWriter, r *http.Request) {
log.Err(err).Msg("error writing body after header were set")
}
}

// TODO: put and post code is pretty much the same. Should be solved in a nicer way in the long run.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@labkode I was pairing on this with @kulmann and while the patch works we should use the simplehttpproxy from the stdlib here ... @labkode we can look into that after this pr.

func (s *svc) doPatch(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
log := appctx.GetLogger(ctx)

claims, err := s.verify(ctx, r)
if err != nil {
err = errors.Wrap(err, "datagateway: error validating transfer token")
log.Err(err).Str("token", r.Header.Get(TokenTransportHeader)).Msg("invalid transfer token")
w.WriteHeader(http.StatusForbidden)
return
}

target := claims.Target
// add query params to target, clients can send checksums and other information.
targetURL, err := url.Parse(target)
if err != nil {
log.Err(err).Msg("datagateway: error parsing target url")
w.WriteHeader(http.StatusInternalServerError)
return
}

targetURL.RawQuery = r.URL.RawQuery
target = targetURL.String()

log.Info().Str("target", claims.Target).Msg("sending request to internal data server")

httpClient := rhttp.GetHTTPClient(ctx)
httpReq, err := rhttp.NewRequest(ctx, "PATCH", target, r.Body)
if err != nil {
log.Err(err).Msg("wrong request")
w.WriteHeader(http.StatusInternalServerError)
return
}
httpReq.Header = r.Header

httpRes, err := httpClient.Do(httpReq)
if err != nil {
log.Err(err).Msg("error doing PATCH request to data service")
w.WriteHeader(http.StatusInternalServerError)
return
}

copyHeader(w.Header(), httpRes.Header)

if httpRes.StatusCode != http.StatusOK {
w.WriteHeader(httpRes.StatusCode)
return
}

defer httpRes.Body.Close()
w.WriteHeader(http.StatusOK)
_, err = io.Copy(w, httpRes.Body)
if err != nil {
log.Err(err).Msg("error writing body after header were set")
}
}

func copyHeader(dst, src http.Header) {
for key, values := range src {
for i := range values {
dst.Add(key, values[i])
}
}
}
4 changes: 3 additions & 1 deletion internal/http/services/owncloud/ocdav/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"strconv"
"time"

"github.com/cs3org/reva/internal/http/services/datagateway"

rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/internal/http/utils"
Expand Down Expand Up @@ -103,7 +105,7 @@ func (s *svc) handleGet(w http.ResponseWriter, r *http.Request, ns string) {
w.WriteHeader(http.StatusInternalServerError)
return
}
httpReq.Header.Set("X-Reva-Transfer", dRes.Token)
httpReq.Header.Set(datagateway.TokenTransportHeader, dRes.Token)
httpClient := rhttp.GetHTTPClient(ctx)

httpRes, err := httpClient.Do(httpReq)
Expand Down
2 changes: 2 additions & 0 deletions internal/http/services/owncloud/ocdav/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
typespb "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
"github.com/cs3org/reva/internal/http/services/datagateway"
"github.com/cs3org/reva/internal/http/utils"
"github.com/cs3org/reva/pkg/appctx"
"github.com/cs3org/reva/pkg/rhttp"
Expand Down Expand Up @@ -264,6 +265,7 @@ func (s *svc) handlePut(w http.ResponseWriter, r *http.Request, ns string) {
Str("token", tokenpkg.ContextMustGetToken(ctx)).
Msg("adding token to header")
c.Header.Set(tokenpkg.TokenHeader, tokenpkg.ContextMustGetToken(ctx))
c.Header.Set(datagateway.TokenTransportHeader, uRes.Token)

tusc, err := tus.NewClient(dataServerURL, c)
if err != nil {
Expand Down
10 changes: 10 additions & 0 deletions internal/http/services/owncloud/ocdav/tus.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package ocdav
import (
"net/http"
"path"
"strings"
"time"

rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
Expand Down Expand Up @@ -151,6 +152,15 @@ func (s *svc) handleTusPost(w http.ResponseWriter, r *http.Request, ns string) {
return
}

// TUS clients don't understand the reva transfer token. We need to append it to the upload endpoint.
// The DataGateway has to take care of pulling it back into the request header upon request arrival.
if uRes.Token != "" {
if !strings.HasSuffix(uRes.UploadEndpoint, "/") {
uRes.UploadEndpoint += "/"
}
uRes.UploadEndpoint += uRes.Token
}

w.Header().Set("Location", uRes.UploadEndpoint)

// for creation-with-upload extension forward bytes to dataprovider
Expand Down