Skip to content

Commit

Permalink
Convert io.Reader to io.ReadSeeker to pass to tus client
Browse files Browse the repository at this point in the history
  • Loading branch information
ishank011 committed Oct 21, 2020
1 parent 4ff3015 commit b1f8529
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 80 deletions.
25 changes: 3 additions & 22 deletions cmd/reva/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/cs3org/reva/internal/http/services/datagateway"
"github.com/pkg/errors"

"github.com/cheggaaa/pb"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
typespb "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
Expand Down Expand Up @@ -148,14 +147,9 @@ func uploadCommand() *command {

dataServerURL := res.UploadEndpoint

bar := pb.New(int(md.Size())).SetUnits(pb.U_BYTES)
bar.Start()
reader := bar.NewProxyReader(fd)

if *disableTusFlag {
httpReq, err := rhttp.NewRequest(ctx, "PUT", dataServerURL, reader)
httpReq, err := rhttp.NewRequest(ctx, "PUT", dataServerURL, fd)
if err != nil {
bar.Finish()
return err
}

Expand All @@ -175,12 +169,10 @@ func uploadCommand() *command {

httpRes, err := httpClient.Do(httpReq)
if err != nil {
bar.Finish()
return err
}
defer httpRes.Body.Close()
if httpRes.StatusCode != http.StatusOK {
bar.Finish()
return err
}
} else {
Expand All @@ -196,7 +188,6 @@ func uploadCommand() *command {
)
c.Store, err = memorystore.NewMemoryStore()
if err != nil {
bar.Finish()
return err
}
if token, ok := tokenpkg.ContextGetToken(ctx); ok {
Expand All @@ -207,7 +198,6 @@ func uploadCommand() *command {
}
tusc, err := tus.NewClient(dataServerURL, c)
if err != nil {
bar.Finish()
return err
}

Expand All @@ -220,7 +210,7 @@ func uploadCommand() *command {
fingerprint := fmt.Sprintf("%s-%d-%s-%s", md.Name(), md.Size(), md.ModTime(), xs)

// create an upload from a file.
upload := tus.NewUpload(reader, md.Size(), metadata, fingerprint)
upload := tus.NewUpload(fd, md.Size(), metadata, fingerprint)

// create the uploader.
c.Store.Set(upload.Fingerprint, dataServerURL)
Expand All @@ -229,13 +219,10 @@ func uploadCommand() *command {
// start the uploading process.
err = uploader.Upload()
if err != nil {
bar.Finish()
return err
}
}

bar.Finish()

req2 := &provider.StatRequest{
Ref: &provider.Reference{
Spec: &provider.Reference_Path{
Expand Down Expand Up @@ -290,21 +277,15 @@ func checkUploadWebdavRef(endpoint string, opaque *typespb.Opaque, md os.FileInf
return errors.New("opaque entry decoder not recognized: " + fileOpaque.Decoder)
}

bar := pb.New(int(md.Size())).SetUnits(pb.U_BYTES)
bar.Start()
reader := bar.NewProxyReader(fd)

c := gowebdav.NewClient(endpoint, "", "")
c.SetHeader(tokenpkg.TokenHeader, token)
c.SetHeader("Upload-Length", strconv.FormatInt(md.Size(), 10))

err := c.WriteStream(filePath, reader, 0700)
err := c.WriteStream(filePath, fd, 0700)
if err != nil {
bar.Finish()
return err
}

bar.Finish()
fmt.Println("File uploaded")
return nil
}
Expand Down
17 changes: 11 additions & 6 deletions internal/http/services/dataprovider/dataprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ func init() {
}

type config struct {
Prefix string `mapstructure:"prefix" docs:"data;The prefix to be used for this HTTP service"`
Driver string `mapstructure:"driver" docs:"localhome;The storage driver to be used."`
Drivers map[string]map[string]interface{} `mapstructure:"drivers" docs:"url:pkg/storage/fs/localhome/localhome.go;The configuration for the storage driver"`
Timeout int64 `mapstructure:"timeout"`
Insecure bool `mapstructure:"insecure"`
DisableTus bool `mapstructure:"disable_tus" docs:"false;Whether to disable TUS uploads."`
Prefix string `mapstructure:"prefix" docs:"data;The prefix to be used for this HTTP service"`
Driver string `mapstructure:"driver" docs:"localhome;The storage driver to be used."`
Drivers map[string]map[string]interface{} `mapstructure:"drivers" docs:"url:pkg/storage/fs/localhome/localhome.go;The configuration for the storage driver"`
Timeout int64 `mapstructure:"timeout"`
Insecure bool `mapstructure:"insecure"`
DisableTus bool `mapstructure:"disable_tus" docs:"false;Whether to disable TUS uploads."`
TempDirectory string `mapstructure:"temp_directory"`
}

func (c *config) init() {
Expand All @@ -53,6 +54,10 @@ func (c *config) init() {
c.Driver = "localhome"
}

if c.TempDirectory == "" {
c.TempDirectory = "/var/tmp/reva/tmp"
}

}

type svc struct {
Expand Down
15 changes: 15 additions & 0 deletions internal/http/services/dataprovider/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ package dataprovider

import (
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"path"
"strconv"
"strings"
Expand Down Expand Up @@ -73,6 +76,18 @@ func (s *svc) doTusPut(w http.ResponseWriter, r *http.Request) {
}
}

fd, err := ioutil.TempFile(fmt.Sprintf("/%s", s.conf.TempDirectory), "")
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
defer os.RemoveAll(fd.Name())
defer fd.Close()
if _, err := io.Copy(fd, r.Body); err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}

dataServerURL := fmt.Sprintf("http://%s%s", r.Host, r.RequestURI)

// create the tus client.
Expand Down
15 changes: 13 additions & 2 deletions internal/http/services/owncloud/ocdav/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"io"
"net/http"
"os"
"path"
"strings"

Expand Down Expand Up @@ -293,16 +294,26 @@ func (s *svc) descend(ctx context.Context, client gateway.GatewayAPIClient, src
return fmt.Errorf("status code %d", httpDownloadRes.StatusCode)
}

fileName, fd, err := s.createChunkTempFile()
if err != nil {
return err
}
defer os.RemoveAll(fileName)
defer fd.Close()
if _, err := io.Copy(fd, httpDownloadRes.Body); err != nil {
return err
}

// do upload
err = s.tusUpload(ctx, uRes.UploadEndpoint, uRes.Token, dst, httpDownloadRes.Body, src.GetSize())
err = s.tusUpload(ctx, uRes.UploadEndpoint, uRes.Token, dst, fd, int64(src.GetSize()))
if err != nil {
return err
}
}
return nil
}

func (s *svc) tusUpload(ctx context.Context, dataServerURL string, transferToken string, fn string, body io.Reader, length uint64) error {
func (s *svc) tusUpload(ctx context.Context, dataServerURL string, transferToken string, fn string, body io.ReadSeeker, length int64) error {
var err error
log := appctx.GetLogger(ctx)

Expand Down
52 changes: 2 additions & 50 deletions internal/http/services/owncloud/ocdav/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,8 @@ import (
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
typespb "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
"github.com/cs3org/reva/internal/http/services/datagateway"
"github.com/cs3org/reva/internal/http/utils"
"github.com/cs3org/reva/pkg/appctx"
tokenpkg "github.com/cs3org/reva/pkg/token"
"github.com/eventials/go-tus"
"github.com/eventials/go-tus/memorystore"
)

func isChunked(fn string) (bool, error) {
Expand Down Expand Up @@ -283,52 +279,8 @@ func (s *svc) handlePutHelper(w http.ResponseWriter, r *http.Request, content io
return
}

dataServerURL := uRes.UploadEndpoint

// create the tus client.
c := tus.DefaultConfig()
c.Resume = true
c.HttpClient = s.client

c.Store, err = memorystore.NewMemoryStore()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}

log.Debug().
Str("upload-endpoint", dataServerURL).
Str("auth-header", tokenpkg.TokenHeader).
Str("auth-token", tokenpkg.ContextMustGetToken(ctx)).
Str("transfer-header", datagateway.TokenTransportHeader).
Str("transfer-token", uRes.Token).
Msg("adding tokens to headers")
c.Header.Set(tokenpkg.TokenHeader, tokenpkg.ContextMustGetToken(ctx))
c.Header.Set(datagateway.TokenTransportHeader, uRes.Token)

tusc, err := tus.NewClient(dataServerURL, c)
if err != nil {
log.Error().Err(err).Msg("Could not get TUS client")
w.WriteHeader(http.StatusInternalServerError)
return
}

metadata := map[string]string{
"filename": path.Base(fn),
"dir": path.Dir(fn),
//"checksum": fmt.Sprintf("%s %s", storageprovider.GRPC2PKGXS(xsType).String(), xs),
}

upload := tus.NewUpload(content, length, metadata, "")

// create the uploader.
c.Store.Set(upload.Fingerprint, dataServerURL)
uploader := tus.NewUploader(tusc, dataServerURL, upload, 0)

// start the uploading process.
err = uploader.Upload()
if err != nil {
log.Error().Err(err).Msg("Could not start TUS upload")
if err = s.tusUpload(ctx, uRes.UploadEndpoint, uRes.Token, fn, content, length); err != nil {
log.Error().Err(err).Msg("TUS upload failed")
w.WriteHeader(http.StatusInternalServerError)
return
}
Expand Down

0 comments on commit b1f8529

Please sign in to comment.