From e6ee95301b20c9451ddcf4711577c85b87fbbdc6 Mon Sep 17 00:00:00 2001 From: Javier Ferrer Date: Tue, 19 Sep 2023 15:19:22 +0200 Subject: [PATCH 1/8] Do not cd to user home if user homes are disabled --- pkg/storage/fs/cephfs/connections.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/fs/cephfs/connections.go b/pkg/storage/fs/cephfs/connections.go index 6ffb0dbb7f..757e1f1e1a 100644 --- a/pkg/storage/fs/cephfs/connections.go +++ b/pkg/storage/fs/cephfs/connections.go @@ -212,7 +212,7 @@ func newConn(user *User) *cacheVal { return destroyCephConn(mount, perm) } - if user != nil { + if user != nil && !user.fs.conf.DisableHome { if err = mount.ChangeDir(user.fs.conf.Root); err != nil { return destroyCephConn(mount, perm) } From d867ba4039694f1acaf790e7c9a398ce96874b1d Mon Sep 17 00:00:00 2001 From: Javier Ferrer Date: Tue, 19 Sep 2023 15:44:33 +0200 Subject: [PATCH 2/8] Simplify uploads --- pkg/storage/fs/cephfs/upload.go | 336 ++------------------------------ 1 file changed, 15 insertions(+), 321 deletions(-) diff --git a/pkg/storage/fs/cephfs/upload.go b/pkg/storage/fs/cephfs/upload.go index 5c6d929e1f..2f330494a1 100644 --- a/pkg/storage/fs/cephfs/upload.go +++ b/pkg/storage/fs/cephfs/upload.go @@ -22,35 +22,19 @@ package cephfs import ( - "bytes" "context" - "encoding/json" "io" "os" - "path/filepath" - cephfs2 "github.com/ceph/go-ceph/cephfs" - userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" - "github.com/cs3org/reva/pkg/appctx" - ctx2 "github.com/cs3org/reva/pkg/ctx" "github.com/cs3org/reva/pkg/errtypes" - "github.com/cs3org/reva/pkg/utils" - "github.com/google/uuid" "github.com/pkg/errors" - tusd "github.com/tus/tusd/pkg/handler" ) func (fs *cephfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser) error { user := fs.makeUser(ctx) - upload, err := fs.GetUpload(ctx, ref.GetPath()) - if err != nil { - return errors.Wrap(err, "cephfs: error retrieving upload") - } - - uploadInfo := upload.(*fileUpload) + p := ref.GetPath() - p := uploadInfo.info.Storage["InternalDestination"] ok, err := IsChunked(p) if err != nil { return errors.Wrap(err, "cephfs: error checking path") @@ -62,13 +46,8 @@ func (fs *cephfs) Upload(ctx context.Context, ref *provider.Reference, r io.Read return err } if p == "" { - if err = uploadInfo.Terminate(ctx); err != nil { - return errors.Wrap(err, "cephfs: error removing auxiliary files") - } return errtypes.PartialContent(ref.String()) } - uploadInfo.info.Storage["InternalDestination"] = p - user.op(func(cv *cacheVal) { r, err = cv.mount.Open(assembledFile, os.O_RDONLY, 0) }) @@ -81,318 +60,33 @@ func (fs *cephfs) Upload(ctx context.Context, ref *provider.Reference, r io.Read }) } - if _, err := uploadInfo.WriteChunk(ctx, 0, r); err != nil { - return errors.Wrap(err, "cephfs: error writing to binary file") - } - - return uploadInfo.FinishUpload(ctx) -} - -func (fs *cephfs) InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64, metadata map[string]string) (map[string]string, error) { - user := fs.makeUser(ctx) - np, err := user.resolveRef(ref) - if err != nil { - return nil, errors.Wrap(err, "cephfs: error resolving reference") - } - - info := tusd.FileInfo{ - MetaData: tusd.MetaData{ - "filename": filepath.Base(np), - "dir": filepath.Dir(np), - }, - Size: uploadLength, - } - - if metadata != nil { - if metadata["mtime"] != "" { - info.MetaData["mtime"] = metadata["mtime"] - } - if _, ok := metadata["sizedeferred"]; ok { - info.SizeIsDeferred = true - } - } - - upload, err := fs.NewUpload(ctx, info) - if err != nil { - return nil, err - } - - info, _ = upload.GetInfo(ctx) - - return map[string]string{ - "simple": info.ID, - "tus": info.ID, - }, nil -} - -// UseIn tells the tus upload middleware which extensions it supports. -func (fs *cephfs) UseIn(composer *tusd.StoreComposer) { - composer.UseCore(fs) - composer.UseTerminater(fs) -} - -func (fs *cephfs) NewUpload(ctx context.Context, info tusd.FileInfo) (upload tusd.Upload, err error) { - log := appctx.GetLogger(ctx) - log.Debug().Interface("info", info).Msg("cephfs: NewUpload") - - user := fs.makeUser(ctx) - - fn := info.MetaData["filename"] - if fn == "" { - return nil, errors.New("cephfs: missing filename in metadata") - } - info.MetaData["filename"] = filepath.Clean(info.MetaData["filename"]) - - dir := info.MetaData["dir"] - if dir == "" { - return nil, errors.New("cephfs: missing dir in metadata") - } - info.MetaData["dir"] = filepath.Clean(info.MetaData["dir"]) - - np := filepath.Join(info.MetaData["dir"], info.MetaData["filename"]) - - info.ID = uuid.New().String() - - binPath := fs.getUploadPath(info.ID) - - info.Storage = map[string]string{ - "Type": "Cephfs", - "BinPath": binPath, - "InternalDestination": np, - - "Idp": user.Id.Idp, - "UserId": user.Id.OpaqueId, - "UserName": user.Username, - "UserType": utils.UserTypeToString(user.Id.Type), - - "LogLevel": log.GetLevel().String(), - } - - // Create binary file with no content + var file io.WriteCloser user.op(func(cv *cacheVal) { - var f *cephfs2.File - defer closeFile(f) - f, err = cv.mount.Open(binPath, os.O_CREATE|os.O_WRONLY, fs.conf.FilePerms) + file, err = cv.mount.Open(p, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, fs.conf.FilePerms) if err != nil { + err = errors.Wrap(err, "cephfs: error opening binary file") return } - }) - //TODO: if we get two same upload ids, the second one can't upload at all - if err != nil { - return - } - - upload = &fileUpload{ - info: info, - binPath: binPath, - infoPath: binPath + ".info", - fs: fs, - ctx: ctx, - } - - // writeInfo creates the file by itself if necessary - err = upload.(*fileUpload).writeInfo() - - return -} - -func (fs *cephfs) getUploadPath(uploadID string) string { - return filepath.Join(fs.conf.UploadFolder, uploadID) -} - -// GetUpload returns the Upload for the given upload id -func (fs *cephfs) GetUpload(ctx context.Context, id string) (fup tusd.Upload, err error) { - binPath := fs.getUploadPath(id) - info := tusd.FileInfo{} - if err != nil { - return nil, errtypes.NotFound("bin path for upload " + id + " not found") - } - infoPath := binPath + ".info" - - var data bytes.Buffer - f, err := fs.adminConn.adminMount.Open(infoPath, os.O_RDONLY, 0) - if err != nil { - return - } - _, err = io.Copy(&data, f) - if err != nil { - return - } - if err = json.Unmarshal(data.Bytes(), &info); err != nil { - return - } - - u := &userpb.User{ - Id: &userpb.UserId{ - Idp: info.Storage["Idp"], - OpaqueId: info.Storage["UserId"], - }, - Username: info.Storage["UserName"], - } - ctx = ctx2.ContextSetUser(ctx, u) - user := fs.makeUser(ctx) - - var stat Statx - user.op(func(cv *cacheVal) { - stat, err = cv.mount.Statx(binPath, cephfs2.StatxSize, 0) - }) - if err != nil { - return - } - info.Offset = int64(stat.Size) - - return &fileUpload{ - info: info, - binPath: binPath, - infoPath: infoPath, - fs: fs, - ctx: ctx, - }, nil -} - -type fileUpload struct { - // info stores the current information about the upload - info tusd.FileInfo - // infoPath is the path to the .info file - infoPath string - // binPath is the path to the binary file (which has no extension) - binPath string - // only fs knows how to handle metadata and versions - fs *cephfs - // a context with a user - ctx context.Context -} - -// GetInfo returns the FileInfo -func (upload *fileUpload) GetInfo(ctx context.Context) (tusd.FileInfo, error) { - return upload.info, nil -} - -// GetReader returns an io.Reader for the upload -func (upload *fileUpload) GetReader(ctx context.Context) (file io.Reader, err error) { - user := upload.fs.makeUser(upload.ctx) - user.op(func(cv *cacheVal) { - file, err = cv.mount.Open(upload.binPath, os.O_RDONLY, 0) - }) - return -} - -// WriteChunk writes the stream from the reader to the given offset of the upload -func (upload *fileUpload) WriteChunk(ctx context.Context, offset int64, src io.Reader) (n int64, err error) { - var file io.WriteCloser - user := upload.fs.makeUser(upload.ctx) - user.op(func(cv *cacheVal) { - file, err = cv.mount.Open(upload.binPath, os.O_WRONLY|os.O_APPEND, 0) - }) - if err != nil { - return 0, err - } - defer file.Close() - - n, err = io.Copy(file, src) - - // If the HTTP PATCH request gets interrupted in the middle (e.g. because - // the user wants to pause the upload), Go's net/http returns an io.ErrUnexpectedEOF. - // However, for OwnCloudStore it's not important whether the stream has ended - // on purpose or accidentally. - if err != nil { - if err != io.ErrUnexpectedEOF { - return n, err - } - } - - upload.info.Offset += n - err = upload.writeInfo() - - return n, err -} - -// writeInfo updates the entire information. Everything will be overwritten. -func (upload *fileUpload) writeInfo() error { - data, err := json.Marshal(upload.info) + defer file.Close() - if err != nil { - return err - } - user := upload.fs.makeUser(upload.ctx) - user.op(func(cv *cacheVal) { - var file io.WriteCloser - if file, err = cv.mount.Open(upload.infoPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, upload.fs.conf.FilePerms); err != nil { + _, err = io.Copy(file, r) + if err != nil { + err = errors.Wrap(err, "cephfs: error writing to binary file") return } - defer file.Close() - - _, err = io.Copy(file, bytes.NewReader(data)) }) return err } -// FinishUpload finishes an upload and moves the file to the internal destination -func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { - - np := upload.info.Storage["InternalDestination"] - - // TODO check etag with If-Match header - // if destination exists - // if _, err := os.Stat(np); err == nil { - // the local storage does not store metadata - // the fileid is based on the path, so no we do not need to copy it to the new file - // the local storage does not track revisions - // } - - // if destination exists - // if _, err := os.Stat(np); err == nil { - // create revision - // if err := upload.fs.archiveRevision(upload.ctx, np); err != nil { - // return err - // } - // } - - user := upload.fs.makeUser(upload.ctx) - log := appctx.GetLogger(ctx) - - user.op(func(cv *cacheVal) { - err = cv.mount.Rename(upload.binPath, np) - }) - if err != nil { - return errors.Wrap(err, upload.binPath) - } - - // only delete the upload if it was successfully written to the fs - user.op(func(cv *cacheVal) { - err = cv.mount.Unlink(upload.infoPath) - }) +func (fs *cephfs) InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64, metadata map[string]string) (map[string]string, error) { + user := fs.makeUser(ctx) + np, err := user.resolveRef(ref) if err != nil { - if err.Error() != errNotFound { - log.Err(err).Interface("info", upload.info).Msg("cephfs: could not delete upload metadata") - } + return nil, errors.Wrap(err, "cephfs: error resolving reference") } - // TODO: set mtime if specified in metadata - - return -} - -// To implement the termination extension as specified in https://tus.io/protocols/resumable-upload.html#termination -// - the storage needs to implement AsTerminatableUpload -// - the upload needs to implement Terminate - -// AsTerminatableUpload returns a a TerminatableUpload -func (fs *cephfs) AsTerminatableUpload(upload tusd.Upload) tusd.TerminatableUpload { - return upload.(*fileUpload) -} - -// Terminate terminates the upload -func (upload *fileUpload) Terminate(ctx context.Context) (err error) { - user := upload.fs.makeUser(upload.ctx) - - user.op(func(cv *cacheVal) { - if err = cv.mount.Unlink(upload.infoPath); err != nil { - return - } - err = cv.mount.Unlink(upload.binPath) - }) - - return + return map[string]string{ + "simple": np, + }, nil } From aae0b8b5dc91b7da1e63620f939d67344a77c4de Mon Sep 17 00:00:00 2001 From: Javier Ferrer Date: Tue, 19 Sep 2023 15:46:19 +0200 Subject: [PATCH 3/8] Stat shadow folders before creating them --- pkg/storage/fs/cephfs/cephfs.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/pkg/storage/fs/cephfs/cephfs.go b/pkg/storage/fs/cephfs/cephfs.go index e9e175aff4..fc1828bc08 100644 --- a/pkg/storage/fs/cephfs/cephfs.go +++ b/pkg/storage/fs/cephfs/cephfs.go @@ -62,7 +62,7 @@ func init() { registry.Register("cephfs", New) } -// New returns an implementation to of the storage.FS interface that talk to +// New returns an implementation to of the storage.FS interface that talks to // a ceph filesystem. func New(ctx context.Context, m map[string]interface{}) (fs storage.FS, err error) { var o Options @@ -81,9 +81,12 @@ func New(ctx context.Context, m map[string]interface{}) (fs storage.FS, err erro } for _, dir := range []string{o.ShadowFolder, o.UploadFolder} { - err = adminConn.adminMount.MakeDir(dir, dirPermFull) - if err != nil && err.Error() != errFileExists { - return nil, errors.New("cephfs: can't initialise system dir " + dir + ":" + err.Error()) + _, err := adminConn.adminMount.Statx(dir, cephfs2.StatxMask(cephfs2.StatxIno), 0) + if err != nil { + err = adminConn.adminMount.MakeDir(dir, dirPermFull) + if err != nil && err.Error() != errFileExists { + return nil, errors.New("cephfs: can't initialise system dir " + dir + ":" + err.Error()) + } } } From 2b7cfbb8e50fa8902408baf4977532e1823f0461 Mon Sep 17 00:00:00 2001 From: Javier Ferrer Date: Tue, 19 Sep 2023 15:47:39 +0200 Subject: [PATCH 4/8] Clean up chunked upload --- pkg/storage/fs/cephfs/chunking.go | 122 +----------------------------- 1 file changed, 1 insertion(+), 121 deletions(-) diff --git a/pkg/storage/fs/cephfs/chunking.go b/pkg/storage/fs/cephfs/chunking.go index fc9dbe0bbd..9929834132 100644 --- a/pkg/storage/fs/cephfs/chunking.go +++ b/pkg/storage/fs/cephfs/chunking.go @@ -108,9 +108,7 @@ func (c *ChunkHandler) getChunkFolderName(i *ChunkBLOBInfo) (path string, err er } func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (finish bool, chunk string, err error) { - var chunkInfo *ChunkBLOBInfo - - chunkInfo, err = GetChunkBLOBInfo(path) + chunkInfo, err := GetChunkBLOBInfo(path) if err != nil { err = fmt.Errorf("error getting chunk info from path: %s", path) return @@ -223,122 +221,4 @@ func (c *ChunkHandler) WriteChunk(fn string, r io.ReadCloser) (string, string, e } return chunkInfo.Path, chunk, nil - - // TODO(labkode): implement old chunking - - /* - req2 := &provider.StartWriteSessionRequest{} - res2, err := client.StartWriteSession(ctx, req2) - if err != nil { - logger.Error(ctx, err) - w.WriteHeader(http.StatusInternalServerError) - return - } - - if res2.Status.Code != rpc.Code_CODE_OK { - logger.Println(ctx, res2.Status) - w.WriteHeader(http.StatusInternalServerError) - return - } - - sessID := res2.SessionId - logger.Build().Str("sessID", sessID).Msg(ctx, "got write session id") - - stream, err := client.Write(ctx) - if err != nil { - logger.Error(ctx, err) - w.WriteHeader(http.StatusInternalServerError) - return - } - - buffer := make([]byte, 1024*1024*3) - var offset uint64 - var numChunks uint64 - - for { - n, err := fd.Read(buffer) - if n > 0 { - req := &provider.WriteRequest{Data: buffer, Length: uint64(n), SessionId: sessID, Offset: offset} - err = stream.Send(req) - if err != nil { - logger.Error(ctx, err) - w.WriteHeader(http.StatusInternalServerError) - return - } - - numChunks++ - offset += uint64(n) - } - - if err == io.EOF { - break - } - - if err != nil { - logger.Error(ctx, err) - w.WriteHeader(http.StatusInternalServerError) - return - } - } - - res3, err := stream.CloseAndRecv() - if err != nil { - logger.Error(ctx, err) - w.WriteHeader(http.StatusInternalServerError) - return - } - - if res3.Status.Code != rpc.Code_CODE_OK { - logger.Println(ctx, err) - w.WriteHeader(http.StatusInternalServerError) - return - } - - req4 := &provider.FinishWriteSessionRequest{Filename: chunkInfo.path, SessionId: sessID} - res4, err := client.FinishWriteSession(ctx, req4) - if err != nil { - logger.Error(ctx, err) - w.WriteHeader(http.StatusInternalServerError) - return - } - - if res4.Status.Code != rpc.Code_CODE_OK { - logger.Println(ctx, res4.Status) - w.WriteHeader(http.StatusInternalServerError) - return - } - - req.Filename = chunkInfo.path - res, err = client.Stat(ctx, req) - if err != nil { - logger.Error(ctx, err) - w.WriteHeader(http.StatusInternalServerError) - return - } - - if res.Status.Code != rpc.Code_CODE_OK { - logger.Println(ctx, res.Status) - w.WriteHeader(http.StatusInternalServerError) - return - } - - md2 := res.Metadata - - w.Header().Add("Content-Type", md2.Mime) - w.Header().Set("ETag", md2.Etag) - w.Header().Set("OC-FileId", md2.Id) - w.Header().Set("OC-ETag", md2.Etag) - t := time.Unix(int64(md2.Mtime), 0) - lastModifiedString := t.Format(time.RFC1123Z) - w.Header().Set("Last-Modified", lastModifiedString) - w.Header().Set("X-OC-MTime", "accepted") - - if md == nil { - w.WriteHeader(http.StatusCreated) - return - } - - w.WriteHeader(http.StatusNoContent) - return - */ } From adf94993942de1cc6d26ea877d34e42a704ea5c2 Mon Sep 17 00:00:00 2001 From: Javier Ferrer Date: Tue, 19 Sep 2023 17:10:16 +0200 Subject: [PATCH 5/8] Fix panic on shutdown --- pkg/storage/fs/cephfs/connections.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/fs/cephfs/connections.go b/pkg/storage/fs/cephfs/connections.go index 757e1f1e1a..acc21ed757 100644 --- a/pkg/storage/fs/cephfs/connections.go +++ b/pkg/storage/fs/cephfs/connections.go @@ -63,7 +63,7 @@ func newCache() (c *connections, err error) { MaxCost: usrLimit, BufferItems: 64, OnEvict: func(item *ristretto.Item) { - v := item.Value.(cacheVal) + v := item.Value.(*cacheVal) v.perm.Destroy() _ = v.mount.Unmount() _ = v.mount.Release() From 3cf929fa43f9fc793878e5de1c6a89446e12461b Mon Sep 17 00:00:00 2001 From: Javier Ferrer Date: Thu, 21 Sep 2023 09:12:16 +0200 Subject: [PATCH 6/8] Changelog --- changelog/unreleased/fix-ceph-driver.md | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 changelog/unreleased/fix-ceph-driver.md diff --git a/changelog/unreleased/fix-ceph-driver.md b/changelog/unreleased/fix-ceph-driver.md new file mode 100644 index 0000000000..310d7b6159 --- /dev/null +++ b/changelog/unreleased/fix-ceph-driver.md @@ -0,0 +1,9 @@ +Enhancement: Multiple fixes for Ceph driver + +* Avoid usage/creation of user homes when they are disabled in the config +* Simplify the regular uploads (not chunked) +* Avoid creation of shadow folders at the root if they are already there +* Clean up the chunked upload +* Fix panic on shutdown + +https://github.com/cs3org/reva/pull/4200 From 349e33cb1ee5c971f4eb724556f08b3d75a27e60 Mon Sep 17 00:00:00 2001 From: Javier Ferrer Date: Mon, 25 Sep 2023 11:19:32 +0200 Subject: [PATCH 7/8] Check type before cache eviction Co-authored-by: Gianmaria Del Monte --- pkg/storage/fs/cephfs/connections.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/storage/fs/cephfs/connections.go b/pkg/storage/fs/cephfs/connections.go index acc21ed757..8f35860eb1 100644 --- a/pkg/storage/fs/cephfs/connections.go +++ b/pkg/storage/fs/cephfs/connections.go @@ -63,10 +63,11 @@ func newCache() (c *connections, err error) { MaxCost: usrLimit, BufferItems: 64, OnEvict: func(item *ristretto.Item) { - v := item.Value.(*cacheVal) - v.perm.Destroy() - _ = v.mount.Unmount() - _ = v.mount.Release() + if v, ok := item.Value.(*cacheVal); ok { + v.perm.Destroy() + _ = v.mount.Unmount() + _ = v.mount.Release() + } }, }) if err != nil { From 4f495a0a4b3c5aea696699da64c93c4c283f079a Mon Sep 17 00:00:00 2001 From: Javier Ferrer Date: Tue, 26 Sep 2023 10:00:46 +0200 Subject: [PATCH 8/8] Continue on stat permission denied --- pkg/storage/fs/cephfs/user.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pkg/storage/fs/cephfs/user.go b/pkg/storage/fs/cephfs/user.go index 71e5f436ac..cb016fbc99 100644 --- a/pkg/storage/fs/cephfs/user.go +++ b/pkg/storage/fs/cephfs/user.go @@ -105,6 +105,9 @@ func (user *User) fileAsResourceInfo(cv *cacheVal, path string, stat *cephfs2.Ce _type = provider.ResourceType_RESOURCE_TYPE_CONTAINER if buf, err = cv.mount.GetXattr(path, "ceph.dir.rbytes"); err == nil { size, err = strconv.ParseUint(string(buf), 10, 64) + } else if err.Error() == errPermissionDenied { + // Ignore permission denied errors so ListFolder does not fail because of them. + err = nil } case syscall.S_IFLNK: _type = provider.ResourceType_RESOURCE_TYPE_SYMLINK @@ -116,10 +119,6 @@ func (user *User) fileAsResourceInfo(cv *cacheVal, path string, stat *cephfs2.Ce return nil, errors.New("cephfs: unknown entry type") } - if err != nil { - return - } - var xattrs []string keys := make(map[string]bool, len(mdKeys)) for _, key := range mdKeys {