Skip to content

Commit

Permalink
minimal tus implmentation
Browse files Browse the repository at this point in the history
Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>
  • Loading branch information
butonic committed Apr 21, 2020
1 parent 587a792 commit 959ddc6
Show file tree
Hide file tree
Showing 15 changed files with 1,265 additions and 181 deletions.
49 changes: 32 additions & 17 deletions cmd/reva/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@ import (
"fmt"
"io"
"math"
"net/http"
"os"
"path/filepath"

"github.com/cheggaaa/pb"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"

"github.com/eventials/go-tus"
"github.com/eventials/go-tus/memorystore"

// TODO(labkode): this should not come from this package.
"github.com/cs3org/reva/internal/grpc/services/storageprovider"
"github.com/cs3org/reva/pkg/crypto"
Expand Down Expand Up @@ -65,7 +68,7 @@ func uploadCommand() *command {

fmt.Printf("Local file size: %d bytes\n", md.Size())

client, err := getClient()
gwc, err := getClient()
if err != nil {
return err
}
Expand All @@ -78,7 +81,7 @@ func uploadCommand() *command {
},
}

res, err := client.InitiateFileUpload(ctx, req)
res, err := gwc.InitiateFileUpload(ctx, req)
if err != nil {
return err
}
Expand Down Expand Up @@ -113,39 +116,51 @@ func uploadCommand() *command {
bar.Start()
reader := bar.NewProxyReader(fd)

httpReq, err := rhttp.NewRequest(ctx, "PUT", dataServerURL, reader)
// create the tus client.
c := tus.DefaultConfig()
c.Resume = true
c.HttpClient = rhttp.GetHTTPClient(ctx)
c.Store, err = memorystore.NewMemoryStore()
if err != nil {
return err
}
c.Header.Add("X-Reva-Transfer", res.Token)
tusc, err := tus.NewClient(dataServerURL, c)
if err != nil {
return err
}

metadata := map[string]string{
"filename": filepath.Base(target),
"dir": filepath.Dir(target),
"checksum": fmt.Sprintf("%s %s", storageprovider.GRPC2PKGXS(xsType).String(), xs),
}

fingerprint := fmt.Sprintf("%s-%d-%s-%s", md.Name(), md.Size(), md.ModTime(), xs)

httpReq.Header.Set("X-Reva-Transfer", res.Token)
q := httpReq.URL.Query()
q.Add("xs", xs)
q.Add("xs_type", storageprovider.GRPC2PKGXS(xsType).String())
httpReq.URL.RawQuery = q.Encode()
// create an upload from a file.
upload := tus.NewUpload(reader, md.Size(), metadata, fingerprint)

httpClient := rhttp.GetHTTPClient(ctx)
// create the uploader.
c.Store.Set(upload.Fingerprint, dataServerURL)
uploader := tus.NewUploader(tusc, dataServerURL, upload, 0)

httpRes, err := httpClient.Do(httpReq)
// start the uploading process.
err = uploader.Upload()
if err != nil {
return err
}
defer httpRes.Body.Close()

bar.Finish()

if httpRes.StatusCode != http.StatusOK {
return err
}

req2 := &provider.StatRequest{
Ref: &provider.Reference{
Spec: &provider.Reference_Path{
Path: target,
},
},
}
res2, err := client.Stat(ctx, req2)
res2, err := gwc.Stat(ctx, req2)
if err != nil {
return err
}
Expand Down
9 changes: 6 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ require (
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/cs3org/go-cs3apis v0.0.0-20200408065125-6e23f3ecec0a
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/eventials/go-tus v0.0.0-20190617130015-9db47421f6a0
github.com/fatih/color v1.7.0 // indirect
github.com/go-openapi/strfmt v0.19.2 // indirect
github.com/gofrs/uuid v3.2.0+incompatible
github.com/golang/protobuf v1.3.5
github.com/gomodule/redigo v2.0.0+incompatible
github.com/google/uuid v1.1.1
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0
github.com/huandu/xstrings v1.3.0 // indirect
github.com/imdario/mergo v0.3.8 // indirect
github.com/jedib0t/go-pretty v4.3.0+incompatible
Expand All @@ -31,12 +32,12 @@ require (
github.com/pkg/errors v0.9.1
github.com/pkg/xattr v0.4.1
github.com/pquerna/cachecontrol v0.0.0-20180517163645-1555304b9b35 // indirect
github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829 // indirect
github.com/rs/cors v1.7.0
github.com/rs/zerolog v1.18.0
github.com/tus/tusd v1.1.0
go.opencensus.io v0.22.3
golang.org/x/crypto v0.0.0-20190411191339-88737f569e3a
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
google.golang.org/grpc v1.28.1
gopkg.in/asn1-ber.v1 v1.0.0-20181015200546-f715ec2f112d // indirect
gopkg.in/cheggaaa/pb.v1 v1.0.27 // indirect
Expand All @@ -45,3 +46,5 @@ require (
)

go 1.13

replace github.com/eventials/go-tus => github.com/andrewmostello/go-tus v0.0.0-20200314041820-904a9904af9a
92 changes: 67 additions & 25 deletions go.sum

Large diffs are not rendered by default.

21 changes: 19 additions & 2 deletions internal/grpc/services/storageprovider/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"net/url"
"os"
"path"
"strconv"
"strings"

rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
Expand Down Expand Up @@ -243,14 +244,30 @@ func (s *service) InitiateFileDownload(ctx context.Context, req *provider.Initia
func (s *service) InitiateFileUpload(ctx context.Context, req *provider.InitiateFileUploadRequest) (*provider.InitiateFileUploadResponse, error) {
// TODO(labkode): same considerations as download
log := appctx.GetLogger(ctx)
url := *s.dataServerURL
newRef, err := s.unwrap(ctx, req.Ref)
if err != nil {
return &provider.InitiateFileUploadResponse{
Status: status.NewInternal(ctx, err, "error unwrapping path"),
}, nil
}
url.Path = path.Join("/", url.Path, newRef.GetPath())
var uploadLength int64
if req.Opaque != nil && req.Opaque.Map != nil && req.Opaque.Map["Upload-Length"] != nil {
var err error
uploadLength, err = strconv.ParseInt(string(req.Opaque.Map["Upload-Length"].Value), 10, 64)
if err != nil {
return &provider.InitiateFileUploadResponse{
Status: status.NewInternal(ctx, err, "error parsing upload length"),
}, nil
}
}
uploadID, err := s.storage.InitiateUpload(ctx, newRef, uploadLength)
if err != nil {
return &provider.InitiateFileUploadResponse{
Status: status.NewInternal(ctx, err, "error getting upload id"),
}, nil
}
url := *s.dataServerURL
url.Path = path.Join("/", url.Path, uploadID)
log.Info().Str("data-server", url.String()).
Str("fn", req.Ref.GetPath()).
Str("xs", fmt.Sprintf("%+v", s.conf.AvailableXS)).
Expand Down
126 changes: 89 additions & 37 deletions internal/http/services/dataprovider/dataprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,23 @@ package dataprovider
import (
"fmt"
"net/http"
"os"

"github.com/cs3org/reva/pkg/appctx"
"github.com/cs3org/reva/pkg/rhttp/global"
"github.com/cs3org/reva/pkg/storage"
"github.com/cs3org/reva/pkg/storage/fs/registry"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
tusd "github.com/tus/tusd/pkg/handler"
)

func init() {
global.Register("dataprovider", New)
}

type config struct {
Prefix string `mapstructure:"prefix"`
Driver string `mapstructure:"driver"`
TmpFolder string `mapstructure:"tmp_folder"`
Drivers map[string]map[string]interface{} `mapstructure:"drivers"`
Prefix string `mapstructure:"prefix"`
Driver string `mapstructure:"driver"`
Drivers map[string]map[string]interface{} `mapstructure:"drivers"`
}

type svc struct {
Expand All @@ -54,18 +53,6 @@ func New(m map[string]interface{}) (global.Service, error) {
return nil, err
}

if conf.Prefix == "" {
conf.Prefix = "data"
}

if conf.TmpFolder == "" {
conf.TmpFolder = os.TempDir()
}

if err := os.MkdirAll(conf.TmpFolder, 0755); err != nil {
return nil, errors.Wrap(err, "could not create tmp dir")
}

fs, err := getFS(conf)
if err != nil {
return nil, err
Expand All @@ -75,8 +62,8 @@ func New(m map[string]interface{}) (global.Service, error) {
storage: fs,
conf: conf,
}
s.setHandler()
return s, nil
err = s.setHandler()
return s, err
}

// Close performs cleanup.
Expand All @@ -88,6 +75,12 @@ func (s *svc) Unprotected() []string {
return []string{}
}

// Create a new DataStore instance which is responsible for
// storing the uploaded file on disk in the specified directory.
// This path _must_ exist before tusd will store uploads in it.
// If you want to save them on a different medium, for example
// a remote FTP server, you can implement your own storage backend
// by implementing the tusd.DataStore interface.
func getFS(c *config) (storage.FS, error) {
if f, ok := registry.NewFuncs[c.Driver]; ok {
return f(c.Drivers[c.Driver])
Expand All @@ -103,24 +96,83 @@ func (s *svc) Handler() http.Handler {
return s.handler
}

func (s *svc) setHandler() {
s.handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case "HEAD":
addCorsHeader(w)
w.WriteHeader(http.StatusOK)
return
case "GET":
s.doGet(w, r)
return
case "PUT":
s.doPut(w, r)
return
default:
w.WriteHeader(http.StatusNotImplemented)
return
// Composable is the interface that a struct needs to implement to be composable by this composer
type Composable interface {
UseIn(composer *tusd.StoreComposer)
}

func (s *svc) setHandler() (err error) {
composable, ok := s.storage.(Composable)
if ok {
// A storage backend for tusd may consist of multiple different parts which
// handle upload creation, locking, termination and so on. The composer is a
// place where all those separated pieces are joined together. In this example
// we only use the file store but you may plug in multiple.
composer := tusd.NewStoreComposer()

// let the composable storage tell tus which extensions it supports
composable.UseIn(composer)

config := tusd.Config{
BasePath: s.conf.Prefix,
StoreComposer: composer,
//Logger: logger, // TODO use logger
}

handler, err := tusd.NewUnroutedHandler(config)
if err != nil {
return err
}
})

s.handler = handler.Middleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

log := appctx.GetLogger(r.Context())
log.Info().Msgf("tusd routing: path=%s", r.URL.Path)

switch r.Method {
// old fashioned download.

// GET is not part of the tus.io protocol
// currently there is no way to GET an upload that is in progress
// TODO allow range based get requests? that end before the current offset
case "GET":
s.doGet(w, r)

// tus.io based upload

// uploads are initiated using the CS3 APIs Initiate Download call
case "POST":
handler.PostFile(w, r)
case "HEAD":
handler.HeadFile(w, r)
case "PATCH":
handler.PatchFile(w, r)
// TODO Only attach the DELETE handler if the Terminate() method is provided
case "DELETE":
handler.DelFile(w, r)
}
}))
} else {
s.handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case "HEAD":
addCorsHeader(w)
w.WriteHeader(http.StatusOK)
return
case "GET":
s.doGet(w, r)
return
case "PUT":
s.doPut(w, r)
return
default:
w.WriteHeader(http.StatusNotImplemented)
return
}
})
}

return err
}

func addCorsHeader(res http.ResponseWriter) {
Expand Down
18 changes: 12 additions & 6 deletions pkg/eosclient/eosclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,12 +591,8 @@ func (c *Client) Read(ctx context.Context, username, path string) (io.ReadCloser
return os.Open(localTarget)
}

// Write writes a file to the mgm
// Write writes a stream to the mgm
func (c *Client) Write(ctx context.Context, username, path string, stream io.ReadCloser) error {
unixUser, err := c.getUnixUser(username)
if err != nil {
return err
}
fd, err := ioutil.TempFile(c.opt.CacheDirectory, "eoswrite-")
if err != nil {
return err
Expand All @@ -609,8 +605,18 @@ func (c *Client) Write(ctx context.Context, username, path string, stream io.Rea
if err != nil {
return err
}

return c.WriteFile(ctx, username, path, fd.Name())
}

// WriteFile writes an existing file to the mgm
func (c *Client) WriteFile(ctx context.Context, username, path, source string) error {
unixUser, err := c.getUnixUser(username)
if err != nil {
return err
}
xrdPath := fmt.Sprintf("%s//%s", c.opt.URL, path)
cmd := exec.CommandContext(ctx, c.opt.XrdcopyBinary, "--nopbar", "--silent", "-f", fd.Name(), xrdPath, fmt.Sprintf("-ODeos.ruid=%s&eos.rgid=%s", unixUser.Uid, unixUser.Gid))
cmd := exec.CommandContext(ctx, c.opt.XrdcopyBinary, "--nopbar", "--silent", "-f", source, xrdPath, fmt.Sprintf("-ODeos.ruid=%s&eos.rgid=%s", unixUser.Uid, unixUser.Gid))
_, _, err = c.execute(ctx, cmd)
return err
}
Expand Down
Loading

0 comments on commit 959ddc6

Please sign in to comment.