From e1d32b8a5ddd3bf2fd85c328eb4290f8ae0b00b2 Mon Sep 17 00:00:00 2001 From: Ishank Arora Date: Fri, 30 Oct 2020 17:09:22 +0100 Subject: [PATCH] Add chunking support for local, owncloud and ocis fs --- pkg/storage/fs/ocis/ocis.go | 19 +++++++++++-------- pkg/storage/fs/ocis/upload.go | 22 ++++++++++++++++++++++ pkg/storage/fs/owncloud/owncloud.go | 12 +++++++++--- pkg/storage/fs/owncloud/upload.go | 22 ++++++++++++++++++++++ pkg/storage/utils/chunking/chunking.go | 12 ++++++------ pkg/storage/utils/eosfs/upload.go | 2 +- pkg/storage/utils/localfs/localfs.go | 12 +++++++++--- pkg/storage/utils/localfs/upload.go | 23 +++++++++++++++++++++++ 8 files changed, 103 insertions(+), 21 deletions(-) diff --git a/pkg/storage/fs/ocis/ocis.go b/pkg/storage/fs/ocis/ocis.go index 8fc1da1d75f..59f092997b6 100644 --- a/pkg/storage/fs/ocis/ocis.go +++ b/pkg/storage/fs/ocis/ocis.go @@ -33,6 +33,7 @@ import ( "github.com/cs3org/reva/pkg/logger" "github.com/cs3org/reva/pkg/storage" "github.com/cs3org/reva/pkg/storage/fs/registry" + "github.com/cs3org/reva/pkg/storage/utils/chunking" "github.com/cs3org/reva/pkg/storage/utils/templates" "github.com/cs3org/reva/pkg/user" "github.com/mitchellh/mapstructure" @@ -160,18 +161,20 @@ func New(m map[string]interface{}) (storage.FS, error) { } return &ocisfs{ - tp: tp, - lu: lu, - o: o, - p: &Permissions{lu: lu}, + tp: tp, + lu: lu, + o: o, + p: &Permissions{lu: lu}, + chunkHandler: chunking.NewChunkHandler(filepath.Join(o.Root, "uploads")), }, nil } type ocisfs struct { - tp TreePersistence - lu *Lookup - o *Options - p *Permissions + tp TreePersistence + lu *Lookup + o *Options + p *Permissions + chunkHandler *chunking.ChunkHandler } func (fs *ocisfs) Shutdown(ctx context.Context) error { diff --git a/pkg/storage/fs/ocis/upload.go b/pkg/storage/fs/ocis/upload.go index a42954387ea..d45354d0401 100644 --- a/pkg/storage/fs/ocis/upload.go +++ b/pkg/storage/fs/ocis/upload.go @@ -32,6 +32,7 @@ import ( "github.com/cs3org/reva/pkg/appctx" "github.com/cs3org/reva/pkg/errtypes" "github.com/cs3org/reva/pkg/logger" + "github.com/cs3org/reva/pkg/storage/utils/chunking" "github.com/cs3org/reva/pkg/user" "github.com/google/uuid" "github.com/pkg/errors" @@ -48,6 +49,27 @@ func (fs *ocisfs) Upload(ctx context.Context, ref *provider.Reference, r io.Read } uploadInfo := upload.(*fileUpload) + + p := uploadInfo.info.Storage["InternalDestination"] + ok, err := chunking.IsChunked(p) + if err != nil { + return errors.Wrap(err, "ocfs: error checking path") + } + if ok { + p, r, err = fs.chunkHandler.WriteChunk(p, r) + if err != nil { + return err + } + if p == "" { + if err = uploadInfo.Terminate(ctx); err != nil { + return errors.Wrap(err, "ocfs: error removing auxiliary files") + } + return errtypes.PartialContent(ref.String()) + } + uploadInfo.info.Storage["InternalDestination"] = p + defer r.Close() + } + if _, err := uploadInfo.WriteChunk(ctx, 0, r); err != nil { return errors.Wrap(err, "ocisfs: error writing to binary file") } diff --git a/pkg/storage/fs/owncloud/owncloud.go b/pkg/storage/fs/owncloud/owncloud.go index ab32fe1f427..6e08cf40471 100644 --- a/pkg/storage/fs/owncloud/owncloud.go +++ b/pkg/storage/fs/owncloud/owncloud.go @@ -44,6 +44,7 @@ import ( "github.com/cs3org/reva/pkg/storage" "github.com/cs3org/reva/pkg/storage/fs/registry" "github.com/cs3org/reva/pkg/storage/utils/ace" + "github.com/cs3org/reva/pkg/storage/utils/chunking" "github.com/cs3org/reva/pkg/storage/utils/templates" "github.com/cs3org/reva/pkg/user" "github.com/gofrs/uuid" @@ -198,12 +199,17 @@ func New(m map[string]interface{}) (storage.FS, error) { }, } - return &ocfs{c: c, pool: pool}, nil + return &ocfs{ + c: c, + pool: pool, + chunkHandler: chunking.NewChunkHandler(c.UploadInfoDir), + }, nil } type ocfs struct { - c *config - pool *redis.Pool + c *config + pool *redis.Pool + chunkHandler *chunking.ChunkHandler } func (fs *ocfs) Shutdown(ctx context.Context) error { diff --git a/pkg/storage/fs/owncloud/upload.go b/pkg/storage/fs/owncloud/upload.go index 508f5e8b61a..9e647403585 100644 --- a/pkg/storage/fs/owncloud/upload.go +++ b/pkg/storage/fs/owncloud/upload.go @@ -31,6 +31,7 @@ import ( "github.com/cs3org/reva/pkg/appctx" "github.com/cs3org/reva/pkg/errtypes" "github.com/cs3org/reva/pkg/logger" + "github.com/cs3org/reva/pkg/storage/utils/chunking" "github.com/cs3org/reva/pkg/storage/utils/templates" "github.com/cs3org/reva/pkg/user" "github.com/google/uuid" @@ -47,6 +48,27 @@ func (fs *ocfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCl } uploadInfo := upload.(*fileUpload) + + p := uploadInfo.info.Storage["InternalDestination"] + ok, err := chunking.IsChunked(p) + if err != nil { + return errors.Wrap(err, "ocfs: error checking path") + } + if ok { + p, r, err = fs.chunkHandler.WriteChunk(p, r) + if err != nil { + return err + } + if p == "" { + if err = uploadInfo.Terminate(ctx); err != nil { + return errors.Wrap(err, "ocfs: error removing auxiliary files") + } + return errtypes.PartialContent(ref.String()) + } + uploadInfo.info.Storage["InternalDestination"] = p + defer r.Close() + } + if _, err := uploadInfo.WriteChunk(ctx, 0, r); err != nil { return errors.Wrap(err, "ocfs: error writing to binary file") } diff --git a/pkg/storage/utils/chunking/chunking.go b/pkg/storage/utils/chunking/chunking.go index 13df7103eb8..9877f9d653a 100644 --- a/pkg/storage/utils/chunking/chunking.go +++ b/pkg/storage/utils/chunking/chunking.go @@ -39,8 +39,8 @@ func IsChunked(fn string) (bool, error) { type ChunkBLOBInfo struct { Path string TransferID string - TotalChunks int64 - CurrentChunk int64 + TotalChunks int + CurrentChunk int } // Not using the resource path in the chunk folder name allows uploading to @@ -54,12 +54,12 @@ func GetChunkBLOBInfo(path string) (*ChunkBLOBInfo, error) { parts := strings.Split(path, "-chunking-") tail := strings.Split(parts[1], "-") - totalChunks, err := strconv.ParseInt(tail[1], 10, 64) + totalChunks, err := strconv.Atoi(tail[1]) if err != nil { return nil, err } - currentChunk, err := strconv.ParseInt(tail[2], 10, 64) + currentChunk, err := strconv.Atoi(tail[2]) if err != nil { return nil, err } @@ -159,7 +159,7 @@ func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (bool, string, er // not complete and requires more actions. // This code is needed to notify the owncloud webservice that the upload has not yet been // completed and needs to continue uploading chunks. - if len(chunks) < int(chunkInfo.TotalChunks) { + if len(chunks) < chunkInfo.TotalChunks { return false, "", nil } @@ -198,7 +198,7 @@ func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (bool, string, er return true, assembledFileName, nil } -// Write chunk saves an intermediate chunk temporarily and assembles all chunks +// WriteChunk saves an intermediate chunk temporarily and assembles all chunks // once the final one is received. func (c *ChunkHandler) WriteChunk(fn string, r io.ReadCloser) (string, io.ReadCloser, error) { finish, chunk, err := c.saveChunk(fn, r) diff --git a/pkg/storage/utils/eosfs/upload.go b/pkg/storage/utils/eosfs/upload.go index f86dcc22178..a73197c723e 100644 --- a/pkg/storage/utils/eosfs/upload.go +++ b/pkg/storage/utils/eosfs/upload.go @@ -49,7 +49,7 @@ func (fs *eosfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadC ok, err := chunking.IsChunked(p) if err != nil { - return errors.Wrap(err, "eos: error resolving reference") + return errors.Wrap(err, "eos: error checking path") } if ok { p, r, err = fs.chunkHandler.WriteChunk(p, r) diff --git a/pkg/storage/utils/localfs/localfs.go b/pkg/storage/utils/localfs/localfs.go index 5f643499d53..909b1acd9d3 100644 --- a/pkg/storage/utils/localfs/localfs.go +++ b/pkg/storage/utils/localfs/localfs.go @@ -37,6 +37,7 @@ import ( "github.com/cs3org/reva/pkg/errtypes" "github.com/cs3org/reva/pkg/mime" "github.com/cs3org/reva/pkg/storage" + "github.com/cs3org/reva/pkg/storage/utils/chunking" "github.com/cs3org/reva/pkg/storage/utils/grants" "github.com/cs3org/reva/pkg/storage/utils/templates" "github.com/cs3org/reva/pkg/user" @@ -84,8 +85,9 @@ func (c *Config) init() { } type localfs struct { - conf *Config - db *sql.DB + conf *Config + db *sql.DB + chunkHandler *chunking.ChunkHandler } // NewLocalFS returns a storage.FS interface implementation that controls then @@ -111,7 +113,11 @@ func NewLocalFS(c *Config) (storage.FS, error) { return nil, errors.Wrap(err, "localfs: error initializing db") } - return &localfs{conf: c, db: db}, nil + return &localfs{ + conf: c, + db: db, + chunkHandler: chunking.NewChunkHandler(c.Uploads), + }, nil } func (fs *localfs) Shutdown(ctx context.Context) error { diff --git a/pkg/storage/utils/localfs/upload.go b/pkg/storage/utils/localfs/upload.go index bffa861dd8c..fe4b68fb208 100644 --- a/pkg/storage/utils/localfs/upload.go +++ b/pkg/storage/utils/localfs/upload.go @@ -29,6 +29,8 @@ import ( userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/cs3org/reva/pkg/appctx" + "github.com/cs3org/reva/pkg/errtypes" + "github.com/cs3org/reva/pkg/storage/utils/chunking" "github.com/cs3org/reva/pkg/user" "github.com/google/uuid" "github.com/pkg/errors" @@ -44,6 +46,27 @@ func (fs *localfs) Upload(ctx context.Context, ref *provider.Reference, r io.Rea } uploadInfo := upload.(*fileUpload) + + p := uploadInfo.info.Storage["InternalDestination"] + ok, err := chunking.IsChunked(p) + if err != nil { + return errors.Wrap(err, "ocfs: error checking path") + } + if ok { + p, r, err = fs.chunkHandler.WriteChunk(p, r) + if err != nil { + return err + } + if p == "" { + if err = uploadInfo.Terminate(ctx); err != nil { + return errors.Wrap(err, "ocfs: error removing auxiliary files") + } + return errtypes.PartialContent(ref.String()) + } + uploadInfo.info.Storage["InternalDestination"] = p + defer r.Close() + } + if _, err := uploadInfo.WriteChunk(ctx, 0, r); err != nil { return errors.Wrap(err, "ocisfs: error writing to binary file") }