From 28969c7875cb09c949036c0349b6d041b1e63304 Mon Sep 17 00:00:00 2001 From: Ishank Arora Date: Fri, 30 Oct 2020 15:28:10 +0100 Subject: [PATCH] Move chunking assembly logic to fs --- internal/http/services/dataprovider/put.go | 7 +- internal/http/services/owncloud/ocdav/put.go | 31 +------ pkg/errtypes/errtypes.go | 19 +++- .../storage/utils/chunking/chunking.go | 91 ++++++++----------- pkg/storage/utils/eosfs/eosfs.go | 7 +- pkg/storage/utils/eosfs/upload.go | 15 ++- 6 files changed, 84 insertions(+), 86 deletions(-) rename internal/http/services/owncloud/ocdav/putchunked.go => pkg/storage/utils/chunking/chunking.go (77%) diff --git a/internal/http/services/dataprovider/put.go b/internal/http/services/dataprovider/put.go index cc70af5c5c0..b9b1fe9d4d4 100644 --- a/internal/http/services/dataprovider/put.go +++ b/internal/http/services/dataprovider/put.go @@ -24,23 +24,28 @@ import ( provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/cs3org/reva/pkg/appctx" + "github.com/cs3org/reva/pkg/errtypes" ) func (s *svc) doPut(w http.ResponseWriter, r *http.Request) { ctx := r.Context() log := appctx.GetLogger(ctx) fn := r.URL.Path + defer r.Body.Close() fsfn := strings.TrimPrefix(fn, s.conf.Prefix) ref := &provider.Reference{Spec: &provider.Reference_Path{Path: fsfn}} err := s.storage.Upload(ctx, ref, r.Body) if err != nil { + if _, ok := err.(errtypes.IsPartialContent); ok { + w.WriteHeader(http.StatusPartialContent) + return + } log.Error().Err(err).Msg("error uploading file") w.WriteHeader(http.StatusInternalServerError) return } - r.Body.Close() w.WriteHeader(http.StatusOK) } diff --git a/internal/http/services/owncloud/ocdav/put.go b/internal/http/services/owncloud/ocdav/put.go index 5d2afb788a8..5822a67b75e 100644 --- a/internal/http/services/owncloud/ocdav/put.go +++ b/internal/http/services/owncloud/ocdav/put.go @@ -22,7 +22,6 @@ import ( "io" "net/http" "path" - "regexp" "strconv" "time" @@ -35,11 +34,6 @@ import ( "github.com/cs3org/reva/pkg/utils" ) -func isChunked(fn string) (bool, error) { - // FIXME: also need to check whether the OC-Chunked header is set - return regexp.MatchString(`-chunking-\w+-[0-9]+-[0-9]+$`, fn) -} - func sufferMacOSFinder(r *http.Request) bool { return r.Header.Get("X-Expected-Entity-Length") != "" } @@ -118,27 +112,6 @@ func (s *svc) handlePut(w http.ResponseWriter, r *http.Request, ns string) { return } - ok, err := isChunked(fn) - if err != nil { - log.Error().Err(err).Msg("error checking if request is chunked or not") - w.WriteHeader(http.StatusInternalServerError) - return - } - - if ok { - // TODO: disable if chunking capability is turned off in config - /** - if s.c.Capabilities.Dav.Chunking == "1.0" { - s.handlePutChunked(w, r) - } else { - log.Error().Err(err).Msg("chunking 1.0 is not enabled") - w.WriteHeader(http.StatusBadRequest) - } - */ - s.handlePutChunked(w, r, ns) - return - } - if isContentRange(r) { log.Warn().Msg("Content-Range not supported for PUT") w.WriteHeader(http.StatusNotImplemented) @@ -279,6 +252,10 @@ func (s *svc) handlePutHelper(w http.ResponseWriter, r *http.Request, content io } defer httpRes.Body.Close() if httpRes.StatusCode != http.StatusOK { + if httpRes.StatusCode == http.StatusPartialContent { + w.WriteHeader(http.StatusPartialContent) + return + } log.Err(err).Msg("PUT request to data server failed") w.WriteHeader(http.StatusInternalServerError) return diff --git a/pkg/errtypes/errtypes.go b/pkg/errtypes/errtypes.go index a01a4679fb4..dbd6c351a1e 100644 --- a/pkg/errtypes/errtypes.go +++ b/pkg/errtypes/errtypes.go @@ -35,6 +35,9 @@ type InternalError string func (e InternalError) Error() string { return "internal error: " + string(e) } +// IsInternalError is the method to check for w +func (e InternalError) IsInternalError() {} + // PermissionDenied is the error to use when a resource cannot be access because of missing permissions. type PermissionDenied string @@ -75,6 +78,14 @@ func (e NotSupported) Error() string { return "error: not supported: " + string( // IsNotSupported implements the IsNotSupported interface. func (e NotSupported) IsNotSupported() {} +// PartialContent is the error to use when the client request has partial data. +type PartialContent string + +func (e PartialContent) Error() string { return "error: partial content: " + string(e) } + +// IsPartialContent implements the IsPartialContent interface. +func (e PartialContent) IsPartialContent() {} + // IsNotFound is the interface to implement // to specify that an a resource is not found. type IsNotFound interface { @@ -112,7 +123,13 @@ type IsNotSupported interface { } // IsPermissionDenied is the interface to implement -// to specify that an action is not supported. +// to specify that an action is denied. type IsPermissionDenied interface { IsPermissionDenied() } + +// IsPartialContent is the interface to implement +// to specify that the client request has partial data. +type IsPartialContent interface { + IsPartialContent() +} diff --git a/internal/http/services/owncloud/ocdav/putchunked.go b/pkg/storage/utils/chunking/chunking.go similarity index 77% rename from internal/http/services/owncloud/ocdav/putchunked.go rename to pkg/storage/utils/chunking/chunking.go index f8c3812205b..fbda9484e59 100644 --- a/internal/http/services/owncloud/ocdav/putchunked.go +++ b/pkg/storage/utils/chunking/chunking.go @@ -16,23 +16,30 @@ // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. -package ocdav +package chunking import ( - "context" "fmt" "io" "io/ioutil" - "net/http" "os" - "path" "path/filepath" + "regexp" "strconv" "strings" - - "github.com/cs3org/reva/pkg/appctx" ) +// ChunkHandler manages chunked uploads, storing the chunks in a temporary directory +// until it gets the final chunk which is then returned. +type ChunkHandler struct { + ChunkFolder string `mapstructure:"chunk_folder"` +} + +// NewChunkHandler creates a handler for chunked uploads. +func NewChunkHandler(chunkFolder string) *ChunkHandler { + return &ChunkHandler{chunkFolder} +} + type chunkBLOBInfo struct { path string transferID string @@ -40,9 +47,8 @@ type chunkBLOBInfo struct { currentChunk int64 } -// not using the resource path in the chunk folder name allows uploading -// to the same folder after a move without having to restart the chunk -// upload +// Not using the resource path in the chunk folder name allows uploading to +// the same folder after a move without having to restart the chunk upload func (c *chunkBLOBInfo) uploadID() string { return fmt.Sprintf("chunking-%s-%d", c.transferID, c.totalChunks) } @@ -72,8 +78,8 @@ func getChunkBLOBInfo(path string) (*chunkBLOBInfo, error) { }, nil } -func (s *svc) createChunkTempFile() (string, *os.File, error) { - file, err := ioutil.TempFile(fmt.Sprintf("/%s", s.c.ChunkFolder), "") +func (c *ChunkHandler) createChunkTempFile() (string, *os.File, error) { + file, err := ioutil.TempFile(fmt.Sprintf("/%s", c.ChunkFolder), "") if err != nil { return "", nil, err } @@ -81,26 +87,22 @@ func (s *svc) createChunkTempFile() (string, *os.File, error) { return file.Name(), file, nil } -func (s *svc) getChunkFolderName(i *chunkBLOBInfo) (string, error) { - path := "/" + s.c.ChunkFolder + filepath.Clean("/"+i.uploadID()) +func (c *ChunkHandler) getChunkFolderName(i *chunkBLOBInfo) (string, error) { + path := "/" + c.ChunkFolder + filepath.Clean("/"+i.uploadID()) if err := os.MkdirAll(path, 0755); err != nil { return "", err } return path, nil } -func (s *svc) saveChunk(ctx context.Context, path string, r io.ReadCloser) (bool, string, error) { - log := appctx.GetLogger(ctx) +func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (bool, string, error) { chunkInfo, err := getChunkBLOBInfo(path) if err != nil { err := fmt.Errorf("error getting chunk info from path: %s", path) return false, "", err } - //c.logger.Info().Log("chunknum", chunkInfo.currentChunk, "chunks", chunkInfo.totalChunks, - //"transferid", chunkInfo.transferID, "uploadid", chunkInfo.uploadID()) - - chunkTempFilename, chunkTempFile, err := s.createChunkTempFile() + chunkTempFilename, chunkTempFile, err := c.createChunkTempFile() if err != nil { return false, "", err } @@ -116,7 +118,7 @@ func (s *svc) saveChunk(ctx context.Context, path string, r io.ReadCloser) (bool return false, "", err } - chunksFolderName, err := s.getChunkFolderName(chunkInfo) + chunksFolderName, err := c.getChunkFolderName(chunkInfo) if err != nil { return false, "", err } @@ -144,7 +146,7 @@ func (s *svc) saveChunk(ctx context.Context, path string, r io.ReadCloser) (bool return false, "", err } - // there is still some chunks to be uploaded. + // there are still some chunks to be uploaded. // we return CodeUploadIsPartial to notify upper layers that the upload is still // not complete and requires more actions. // This code is needed to notify the owncloud webservice that the upload has not yet been @@ -153,7 +155,7 @@ func (s *svc) saveChunk(ctx context.Context, path string, r io.ReadCloser) (bool return false, "", nil } - assembledFileName, assembledFile, err := s.createChunkTempFile() + assembledFileName, assembledFile, err := c.createChunkTempFile() if err != nil { return false, "", err } @@ -183,57 +185,38 @@ func (s *svc) saveChunk(ctx context.Context, path string, r io.ReadCloser) (bool // at this point the assembled file is complete // so we free space removing the chunks folder - defer func() { - if err = os.RemoveAll(chunksFolderName); err != nil { - log.Warn().Err(err).Msg("error deleting chunk folder, remove folder manually/cron to not leak storage space") - } - }() + defer os.RemoveAll(chunksFolderName) return true, assembledFileName, nil } -func (s *svc) handlePutChunked(w http.ResponseWriter, r *http.Request, ns string) { - ctx := r.Context() - log := appctx.GetLogger(ctx) - - fn := r.URL.Path - if r.Body == nil { - log.Warn().Msg("body is nil") - w.WriteHeader(http.StatusBadRequest) - return - } +func (c *ChunkHandler) IsChunked(fn string) (bool, error) { + // FIXME: also need to check whether the OC-Chunked header is set + return regexp.MatchString(`-chunking-\w+-[0-9]+-[0-9]+$`, fn) +} - finish, chunk, err := s.saveChunk(ctx, fn, r.Body) +func (c *ChunkHandler) WriteChunk(fn string, r io.ReadCloser) (string, io.ReadCloser, error) { + finish, chunk, err := c.saveChunk(fn, r) if err != nil { - log.Error().Err(err).Msg("error saving chunk") - w.WriteHeader(http.StatusInternalServerError) - return + return "", nil, err } if !finish { - w.WriteHeader(http.StatusPartialContent) - return + return "", nil, nil } fd, err := os.Open(chunk) if err != nil { - log.Error().Err(err).Msg("error opening chunk") - w.WriteHeader(http.StatusInternalServerError) - return + return "", nil, err } defer fd.Close() - md, err := fd.Stat() + chunkInfo, err := getChunkBLOBInfo(fn) if err != nil { - log.Error().Err(err).Msg("error statting chunk") - w.WriteHeader(http.StatusInternalServerError) - return + return "", nil, err } - chunkInfo, _ := getChunkBLOBInfo(fn) - fn = path.Join(applyLayout(ctx, ns), chunkInfo.path) - - s.handlePutHelper(w, r, fd, fn, md.Size()) + return chunkInfo.path, fd, nil // TODO(labkode): implement old chunking diff --git a/pkg/storage/utils/eosfs/eosfs.go b/pkg/storage/utils/eosfs/eosfs.go index b2865c24112..087f1e696f8 100644 --- a/pkg/storage/utils/eosfs/eosfs.go +++ b/pkg/storage/utils/eosfs/eosfs.go @@ -42,6 +42,7 @@ import ( "github.com/cs3org/reva/pkg/sharedconf" "github.com/cs3org/reva/pkg/storage" "github.com/cs3org/reva/pkg/storage/utils/acl" + "github.com/cs3org/reva/pkg/storage/utils/chunking" "github.com/cs3org/reva/pkg/storage/utils/grants" "github.com/cs3org/reva/pkg/storage/utils/templates" "github.com/cs3org/reva/pkg/user" @@ -175,6 +176,7 @@ func (c *Config) init() { type eosfs struct { c *eosclient.Client conf *Config + chunkHandler *chunking.ChunkHandler singleUserUID string singleUserGID string } @@ -208,8 +210,9 @@ func NewEOSFS(c *Config) (storage.FS, error) { eosClient := eosclient.New(eosClientOpts) eosfs := &eosfs{ - c: eosClient, - conf: c, + c: eosClient, + conf: c, + chunkHandler: chunking.NewChunkHandler(c.CacheDirectory), } return eosfs, nil diff --git a/pkg/storage/utils/eosfs/upload.go b/pkg/storage/utils/eosfs/upload.go index 8c97cac1016..8c07d1285bd 100644 --- a/pkg/storage/utils/eosfs/upload.go +++ b/pkg/storage/utils/eosfs/upload.go @@ -46,8 +46,21 @@ func (fs *eosfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadC return errtypes.PermissionDenied("eos: cannot upload under the virtual share folder") } - fn := fs.wrap(ctx, p) + ok, err := fs.chunkHandler.IsChunked(p) + if err != nil { + return errors.Wrap(err, "eos: error resolving reference") + } + if ok { + p, r, err = fs.chunkHandler.WriteChunk(p, r) + if err != nil { + return err + } + if p == "" { + return errtypes.PartialContent(ref.String()) + } + } + fn := fs.wrap(ctx, p) return fs.c.Write(ctx, uid, gid, fn, r) }