From b1f8529b8d14f808f4cf47e68de1637641bee484 Mon Sep 17 00:00:00 2001 From: Ishank Arora Date: Wed, 21 Oct 2020 14:09:01 +0200 Subject: [PATCH] Convert io.Reader to io.ReadSeeker to pass to tus client --- cmd/reva/upload.go | 25 ++------- .../services/dataprovider/dataprovider.go | 17 +++--- internal/http/services/dataprovider/put.go | 15 ++++++ internal/http/services/owncloud/ocdav/copy.go | 15 +++++- internal/http/services/owncloud/ocdav/put.go | 52 +------------------ 5 files changed, 44 insertions(+), 80 deletions(-) diff --git a/cmd/reva/upload.go b/cmd/reva/upload.go index 9dd5a2619ae..83216945a93 100644 --- a/cmd/reva/upload.go +++ b/cmd/reva/upload.go @@ -31,7 +31,6 @@ import ( "github.com/cs3org/reva/internal/http/services/datagateway" "github.com/pkg/errors" - "github.com/cheggaaa/pb" rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" typespb "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" @@ -148,14 +147,9 @@ func uploadCommand() *command { dataServerURL := res.UploadEndpoint - bar := pb.New(int(md.Size())).SetUnits(pb.U_BYTES) - bar.Start() - reader := bar.NewProxyReader(fd) - if *disableTusFlag { - httpReq, err := rhttp.NewRequest(ctx, "PUT", dataServerURL, reader) + httpReq, err := rhttp.NewRequest(ctx, "PUT", dataServerURL, fd) if err != nil { - bar.Finish() return err } @@ -175,12 +169,10 @@ func uploadCommand() *command { httpRes, err := httpClient.Do(httpReq) if err != nil { - bar.Finish() return err } defer httpRes.Body.Close() if httpRes.StatusCode != http.StatusOK { - bar.Finish() return err } } else { @@ -196,7 +188,6 @@ func uploadCommand() *command { ) c.Store, err = memorystore.NewMemoryStore() if err != nil { - bar.Finish() return err } if token, ok := tokenpkg.ContextGetToken(ctx); ok { @@ -207,7 +198,6 @@ func uploadCommand() *command { } tusc, err := tus.NewClient(dataServerURL, c) if err != nil { - bar.Finish() return err } @@ -220,7 +210,7 @@ func uploadCommand() *command { fingerprint := fmt.Sprintf("%s-%d-%s-%s", md.Name(), md.Size(), md.ModTime(), xs) // create an upload from a file. - upload := tus.NewUpload(reader, md.Size(), metadata, fingerprint) + upload := tus.NewUpload(fd, md.Size(), metadata, fingerprint) // create the uploader. c.Store.Set(upload.Fingerprint, dataServerURL) @@ -229,13 +219,10 @@ func uploadCommand() *command { // start the uploading process. err = uploader.Upload() if err != nil { - bar.Finish() return err } } - bar.Finish() - req2 := &provider.StatRequest{ Ref: &provider.Reference{ Spec: &provider.Reference_Path{ @@ -290,21 +277,15 @@ func checkUploadWebdavRef(endpoint string, opaque *typespb.Opaque, md os.FileInf return errors.New("opaque entry decoder not recognized: " + fileOpaque.Decoder) } - bar := pb.New(int(md.Size())).SetUnits(pb.U_BYTES) - bar.Start() - reader := bar.NewProxyReader(fd) - c := gowebdav.NewClient(endpoint, "", "") c.SetHeader(tokenpkg.TokenHeader, token) c.SetHeader("Upload-Length", strconv.FormatInt(md.Size(), 10)) - err := c.WriteStream(filePath, reader, 0700) + err := c.WriteStream(filePath, fd, 0700) if err != nil { - bar.Finish() return err } - bar.Finish() fmt.Println("File uploaded") return nil } diff --git a/internal/http/services/dataprovider/dataprovider.go b/internal/http/services/dataprovider/dataprovider.go index 46a472fb0fa..7b62ea64d97 100644 --- a/internal/http/services/dataprovider/dataprovider.go +++ b/internal/http/services/dataprovider/dataprovider.go @@ -36,12 +36,13 @@ func init() { } type config struct { - Prefix string `mapstructure:"prefix" docs:"data;The prefix to be used for this HTTP service"` - Driver string `mapstructure:"driver" docs:"localhome;The storage driver to be used."` - Drivers map[string]map[string]interface{} `mapstructure:"drivers" docs:"url:pkg/storage/fs/localhome/localhome.go;The configuration for the storage driver"` - Timeout int64 `mapstructure:"timeout"` - Insecure bool `mapstructure:"insecure"` - DisableTus bool `mapstructure:"disable_tus" docs:"false;Whether to disable TUS uploads."` + Prefix string `mapstructure:"prefix" docs:"data;The prefix to be used for this HTTP service"` + Driver string `mapstructure:"driver" docs:"localhome;The storage driver to be used."` + Drivers map[string]map[string]interface{} `mapstructure:"drivers" docs:"url:pkg/storage/fs/localhome/localhome.go;The configuration for the storage driver"` + Timeout int64 `mapstructure:"timeout"` + Insecure bool `mapstructure:"insecure"` + DisableTus bool `mapstructure:"disable_tus" docs:"false;Whether to disable TUS uploads."` + TempDirectory string `mapstructure:"temp_directory"` } func (c *config) init() { @@ -53,6 +54,10 @@ func (c *config) init() { c.Driver = "localhome" } + if c.TempDirectory == "" { + c.TempDirectory = "/var/tmp/reva/tmp" + } + } type svc struct { diff --git a/internal/http/services/dataprovider/put.go b/internal/http/services/dataprovider/put.go index 1bcbfbb65b0..300433893d2 100644 --- a/internal/http/services/dataprovider/put.go +++ b/internal/http/services/dataprovider/put.go @@ -20,7 +20,10 @@ package dataprovider import ( "fmt" + "io" + "io/ioutil" "net/http" + "os" "path" "strconv" "strings" @@ -73,6 +76,18 @@ func (s *svc) doTusPut(w http.ResponseWriter, r *http.Request) { } } + fd, err := ioutil.TempFile(fmt.Sprintf("/%s", s.conf.TempDirectory), "") + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + defer os.RemoveAll(fd.Name()) + defer fd.Close() + if _, err := io.Copy(fd, r.Body); err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + dataServerURL := fmt.Sprintf("http://%s%s", r.Host, r.RequestURI) // create the tus client. diff --git a/internal/http/services/owncloud/ocdav/copy.go b/internal/http/services/owncloud/ocdav/copy.go index 032ff708410..c2578e71be8 100644 --- a/internal/http/services/owncloud/ocdav/copy.go +++ b/internal/http/services/owncloud/ocdav/copy.go @@ -23,6 +23,7 @@ import ( "fmt" "io" "net/http" + "os" "path" "strings" @@ -293,8 +294,18 @@ func (s *svc) descend(ctx context.Context, client gateway.GatewayAPIClient, src return fmt.Errorf("status code %d", httpDownloadRes.StatusCode) } + fileName, fd, err := s.createChunkTempFile() + if err != nil { + return err + } + defer os.RemoveAll(fileName) + defer fd.Close() + if _, err := io.Copy(fd, httpDownloadRes.Body); err != nil { + return err + } + // do upload - err = s.tusUpload(ctx, uRes.UploadEndpoint, uRes.Token, dst, httpDownloadRes.Body, src.GetSize()) + err = s.tusUpload(ctx, uRes.UploadEndpoint, uRes.Token, dst, fd, int64(src.GetSize())) if err != nil { return err } @@ -302,7 +313,7 @@ func (s *svc) descend(ctx context.Context, client gateway.GatewayAPIClient, src return nil } -func (s *svc) tusUpload(ctx context.Context, dataServerURL string, transferToken string, fn string, body io.Reader, length uint64) error { +func (s *svc) tusUpload(ctx context.Context, dataServerURL string, transferToken string, fn string, body io.ReadSeeker, length int64) error { var err error log := appctx.GetLogger(ctx) diff --git a/internal/http/services/owncloud/ocdav/put.go b/internal/http/services/owncloud/ocdav/put.go index c339f4c6e5d..e4ddfadaa5d 100644 --- a/internal/http/services/owncloud/ocdav/put.go +++ b/internal/http/services/owncloud/ocdav/put.go @@ -30,12 +30,8 @@ import ( rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" typespb "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" - "github.com/cs3org/reva/internal/http/services/datagateway" "github.com/cs3org/reva/internal/http/utils" "github.com/cs3org/reva/pkg/appctx" - tokenpkg "github.com/cs3org/reva/pkg/token" - "github.com/eventials/go-tus" - "github.com/eventials/go-tus/memorystore" ) func isChunked(fn string) (bool, error) { @@ -283,52 +279,8 @@ func (s *svc) handlePutHelper(w http.ResponseWriter, r *http.Request, content io return } - dataServerURL := uRes.UploadEndpoint - - // create the tus client. - c := tus.DefaultConfig() - c.Resume = true - c.HttpClient = s.client - - c.Store, err = memorystore.NewMemoryStore() - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - - log.Debug(). - Str("upload-endpoint", dataServerURL). - Str("auth-header", tokenpkg.TokenHeader). - Str("auth-token", tokenpkg.ContextMustGetToken(ctx)). - Str("transfer-header", datagateway.TokenTransportHeader). - Str("transfer-token", uRes.Token). - Msg("adding tokens to headers") - c.Header.Set(tokenpkg.TokenHeader, tokenpkg.ContextMustGetToken(ctx)) - c.Header.Set(datagateway.TokenTransportHeader, uRes.Token) - - tusc, err := tus.NewClient(dataServerURL, c) - if err != nil { - log.Error().Err(err).Msg("Could not get TUS client") - w.WriteHeader(http.StatusInternalServerError) - return - } - - metadata := map[string]string{ - "filename": path.Base(fn), - "dir": path.Dir(fn), - //"checksum": fmt.Sprintf("%s %s", storageprovider.GRPC2PKGXS(xsType).String(), xs), - } - - upload := tus.NewUpload(content, length, metadata, "") - - // create the uploader. - c.Store.Set(upload.Fingerprint, dataServerURL) - uploader := tus.NewUploader(tusc, dataServerURL, upload, 0) - - // start the uploading process. - err = uploader.Upload() - if err != nil { - log.Error().Err(err).Msg("Could not start TUS upload") + if err = s.tusUpload(ctx, uRes.UploadEndpoint, uRes.Token, fn, content, length); err != nil { + log.Error().Err(err).Msg("TUS upload failed") w.WriteHeader(http.StatusInternalServerError) return }