From e83e52b7b5671645fb5ff1221bed92a0003dc50f Mon Sep 17 00:00:00 2001 From: Javier Ferrer Date: Thu, 2 Nov 2023 12:06:46 +0100 Subject: [PATCH 1/5] Revert changes to ceph driver --- changelog/unreleased/recover-ceph-driver.md | 6 + pkg/storage/fs/cephfs/cephfs.go | 233 ++------------ pkg/storage/fs/cephfs/chunking.go | 132 +------- pkg/storage/fs/cephfs/connections.go | 25 +- pkg/storage/fs/cephfs/errors.go | 2 +- pkg/storage/fs/cephfs/permissions.go | 4 +- pkg/storage/fs/cephfs/upload.go | 336 +------------------- pkg/storage/fs/cephfs/user.go | 13 +- pkg/storage/fs/cephfs/utils.go | 14 +- 9 files changed, 78 insertions(+), 687 deletions(-) create mode 100644 changelog/unreleased/recover-ceph-driver.md diff --git a/changelog/unreleased/recover-ceph-driver.md b/changelog/unreleased/recover-ceph-driver.md new file mode 100644 index 0000000000..58d4dcef70 --- /dev/null +++ b/changelog/unreleased/recover-ceph-driver.md @@ -0,0 +1,6 @@ +Bugfix: Restore changes to ceph driver + +PR [4166](https://github.com/cs3org/reva/pull/4166) accidentally reverted the +ceph driver changes. This bugfix recovers them. + +https://github.com/cs3org/reva/pull/???? diff --git a/pkg/storage/fs/cephfs/cephfs.go b/pkg/storage/fs/cephfs/cephfs.go index bdc4bf73cb..fc1828bc08 100644 --- a/pkg/storage/fs/cephfs/cephfs.go +++ b/pkg/storage/fs/cephfs/cephfs.go @@ -23,25 +23,20 @@ package cephfs import ( "context" - b64 "encoding/base64" - "encoding/json" "fmt" - "hash/fnv" "io" "net/url" "os" "path/filepath" "strconv" "strings" - "time" - goceph "github.com/ceph/go-ceph/cephfs" + cephfs2 "github.com/ceph/go-ceph/cephfs" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/cs3org/reva/pkg/appctx" "github.com/cs3org/reva/pkg/errtypes" "github.com/cs3org/reva/pkg/storage" "github.com/cs3org/reva/pkg/storage/fs/registry" - "github.com/cs3org/reva/pkg/utils" "github.com/cs3org/reva/pkg/utils/cfg" "github.com/pkg/errors" ) @@ -54,7 +49,6 @@ const ( xattrRef = xattrTrustedNs + "ref" xattrUserNs = "user." snap = ".snap" - xattrLock = xattrUserNs + "reva.lockpayload" ) type cephfs struct { @@ -68,7 +62,7 @@ func init() { registry.Register("cephfs", New) } -// New returns an implementation to of the storage.FS interface that talk to +// New returns an implementation to of the storage.FS interface that talks to // a ceph filesystem. func New(ctx context.Context, m map[string]interface{}) (fs storage.FS, err error) { var o Options @@ -87,9 +81,12 @@ func New(ctx context.Context, m map[string]interface{}) (fs storage.FS, err erro } for _, dir := range []string{o.ShadowFolder, o.UploadFolder} { - err = adminConn.adminMount.MakeDir(dir, dirPermFull) - if err != nil && err.Error() != errFileExists { - return nil, errors.New("cephfs: can't initialise system dir " + dir + ":" + err.Error()) + _, err := adminConn.adminMount.Statx(dir, cephfs2.StatxMask(cephfs2.StatxIno), 0) + if err != nil { + err = adminConn.adminMount.MakeDir(dir, dirPermFull) + if err != nil && err.Error() != errFileExists { + return nil, errors.New("cephfs: can't initialise system dir " + dir + ":" + err.Error()) + } } } @@ -118,7 +115,7 @@ func (fs *cephfs) CreateHome(ctx context.Context) (err error) { user := fs.makeUser(ctx) // Stop createhome from running the whole thing because it is called multiple times - if _, err = fs.adminConn.adminMount.Statx(user.home, goceph.StatxMode, 0); err == nil { + if _, err = fs.adminConn.adminMount.Statx(user.home, cephfs2.StatxMode, 0); err == nil { return } @@ -227,7 +224,7 @@ func (fs *cephfs) GetMD(ctx context.Context, ref *provider.Reference, mdKeys []s user.op(func(cv *cacheVal) { var stat Statx - if stat, err = cv.mount.Statx(path, goceph.StatxBasicStats, 0); err != nil { + if stat, err = cv.mount.Statx(path, cephfs2.StatxBasicStats, 0); err != nil { return } ri, err = user.fileAsResourceInfo(cv, path, stat, mdKeys) @@ -251,15 +248,15 @@ func (fs *cephfs) ListFolder(ctx context.Context, ref *provider.Reference, mdKey } user.op(func(cv *cacheVal) { - var dir *goceph.Directory + var dir *cephfs2.Directory if dir, err = cv.mount.OpenDir(path); err != nil { return } defer closeDir(dir) - var entry *goceph.DirEntryPlus + var entry *cephfs2.DirEntryPlus var ri *provider.ResourceInfo - for entry, err = dir.ReadDirPlus(goceph.StatxBasicStats, 0); entry != nil && err == nil; entry, err = dir.ReadDirPlus(goceph.StatxBasicStats, 0) { + for entry, err = dir.ReadDirPlus(cephfs2.StatxBasicStats, 0); entry != nil && err == nil; entry, err = dir.ReadDirPlus(cephfs2.StatxBasicStats, 0) { if fs.conf.HiddenDirs[entry.Name()] { continue } @@ -312,7 +309,7 @@ func (fs *cephfs) ListRevisions(ctx context.Context, ref *provider.Reference) (f err = errtypes.PermissionDenied("cephfs: cannot download under the virtual share folder") return } - var dir *goceph.Directory + var dir *cephfs2.Directory if dir, err = cv.mount.OpenDir(".snap"); err != nil { return } @@ -331,7 +328,7 @@ func (fs *cephfs) ListRevisions(ctx context.Context, ref *provider.Reference) (f if e != nil { continue } - stat, e = cv.mount.Statx(revPath, goceph.StatxMtime|goceph.StatxSize, 0) + stat, e = cv.mount.Statx(revPath, cephfs2.StatxMtime|cephfs2.StatxSize, 0) if e != nil { continue } @@ -378,7 +375,7 @@ func (fs *cephfs) RestoreRevision(ctx context.Context, ref *provider.Reference, return } - var src, dst *goceph.File + var src, dst *cephfs2.File if src, err = cv.mount.Open(revPath, os.O_RDONLY, 0); err != nil { return } @@ -581,7 +578,7 @@ func (fs *cephfs) TouchFile(ctx context.Context, ref *provider.Reference) error } user.op(func(cv *cacheVal) { - var file *goceph.File + var file *cephfs2.File defer closeFile(file) if file, err = cv.mount.Open(path, os.O_CREATE|os.O_WRONLY, fs.conf.FilePerms); err != nil { return @@ -621,204 +618,18 @@ func (fs *cephfs) UpdateStorageSpace(ctx context.Context, req *provider.UpdateSt return nil, errtypes.NotSupported("unimplemented") } -var fnvHash = fnv.New32a() - -func getHash(s string) uint64 { - fnvHash.Write([]byte(s)) - defer fnvHash.Reset() - return uint64(fnvHash.Sum32()) -} - -func encodeLock(l *provider.Lock) string { - data, _ := json.Marshal(l) - return b64.StdEncoding.EncodeToString(data) -} - -func decodeLock(content string) (*provider.Lock, error) { - d, err := b64.StdEncoding.DecodeString(content) - if err != nil { - return nil, err - } - - l := &provider.Lock{} - if err = json.Unmarshal(d, l); err != nil { - return nil, err - } - - return l, nil -} - func (fs *cephfs) SetLock(ctx context.Context, ref *provider.Reference, lock *provider.Lock) error { - user := fs.makeUser(ctx) - path, err := user.resolveRef(ref) - if err != nil { - return getRevaError(err) - } - - op := goceph.LockEX - if lock.Type == provider.LockType_LOCK_TYPE_SHARED { - op = goceph.LockSH - } - - user.op(func(cv *cacheVal) { - var file *goceph.File - defer closeFile(file) - if file, err = cv.mount.Open(path, os.O_RDWR, fs.conf.FilePerms); err != nil { - return - } - - err = file.Flock(op|goceph.LockNB, getHash(lock.AppName)) - }) - - if err == nil { - // ok, we got the flock, now also store the related lock metadata - md := &provider.ArbitraryMetadata{ - Metadata: map[string]string{ - xattrLock: encodeLock(lock), - }, - } - return fs.SetArbitraryMetadata(ctx, ref, md) - } - - return getRevaError(err) + return errtypes.NotSupported("unimplemented") } func (fs *cephfs) GetLock(ctx context.Context, ref *provider.Reference) (*provider.Lock, error) { - user := fs.makeUser(ctx) - path, err := user.resolveRef(ref) - if err != nil { - return nil, getRevaError(err) - } - - var l *provider.Lock - user.op(func(cv *cacheVal) { - buf, errXattr := cv.mount.GetXattr(path, xattrLock) - if errXattr == nil { - if l, err = decodeLock(string(buf)); err != nil { - err = errors.Wrap(err, "malformed lock payload") - return - } - } - - var file *goceph.File - defer closeFile(file) - if file, err = cv.mount.Open(path, os.O_RDWR, fs.conf.FilePerms); err != nil { - // try and open with read-only permissions: if this succeeds, we just return - // the metadata as is, otherwise we return the error on Open() - if file, err = cv.mount.Open(path, os.O_RDONLY, fs.conf.FilePerms); err != nil { - l = nil - } - return - } - - if err = file.Flock(goceph.LockEX|goceph.LockNB, 0); err == nil { - // success means the file was not locked, drop related metadata if present - if l != nil { - fs.UnsetArbitraryMetadata(ctx, ref, []string{xattrLock}) - l = nil - } - file.Flock(goceph.LockUN|goceph.LockNB, 0) - err = errtypes.NotFound("file was not locked") - return - } - - if errXattr != nil { - // error here means we have a "foreign" flock with no CS3 metadata - err = nil - l = new(provider.Lock) - l.AppName = "External" - return - } - - if time.Unix(int64(l.Expiration.Seconds), 0).After(time.Now()) { - // the lock expired, drop - fs.UnsetArbitraryMetadata(ctx, ref, []string{xattrLock}) - file.Flock(goceph.LockUN|goceph.LockNB, getHash(l.AppName)) - err = errtypes.NotFound("file was not locked") - l = nil - } - return - }) - - return l, getRevaError(err) -} - -// TODO(lopresti) part of this logic is duplicated from eosfs.go, should be factored out -func sameHolder(l1, l2 *provider.Lock) bool { - same := true - if l1.User != nil || l2.User != nil { - same = utils.UserEqual(l1.User, l2.User) - } - if l1.AppName != "" || l2.AppName != "" { - same = l1.AppName == l2.AppName - } - return same + return nil, errtypes.NotSupported("unimplemented") } -func (fs *cephfs) RefreshLock(ctx context.Context, ref *provider.Reference, newLock *provider.Lock, existingLockID string) error { - oldLock, err := fs.GetLock(ctx, ref) - if err != nil { - switch err.(type) { - case errtypes.NotFound: - // the lock does not exist - return errtypes.BadRequest("file was not locked") - default: - return err - } - } - - // check if the holder is the same of the new lock - if !sameHolder(oldLock, newLock) { - return errtypes.BadRequest("caller does not hold the lock") - } - - if existingLockID != "" && oldLock.LockId != existingLockID { - return errtypes.BadRequest("lock id does not match") - } - - return fs.SetLock(ctx, ref, newLock) +func (fs *cephfs) RefreshLock(ctx context.Context, ref *provider.Reference, lock *provider.Lock, existingLockID string) error { + return errtypes.NotSupported("unimplemented") } func (fs *cephfs) Unlock(ctx context.Context, ref *provider.Reference, lock *provider.Lock) error { - user := fs.makeUser(ctx) - path, err := user.resolveRef(ref) - if err != nil { - return getRevaError(err) - } - - oldLock, err := fs.GetLock(ctx, ref) - if err != nil { - switch err.(type) { - case errtypes.NotFound: - // the lock does not exist - return errtypes.BadRequest("file not found or not locked") - default: - return err - } - } - - // check if the lock id of the lock corresponds to the stored lock - if oldLock.LockId != lock.LockId { - return errtypes.BadRequest("lock id does not match") - } - - if !sameHolder(oldLock, lock) { - return errtypes.BadRequest("caller does not hold the lock") - } - - user.op(func(cv *cacheVal) { - var file *goceph.File - defer closeFile(file) - if file, err = cv.mount.Open(path, os.O_RDWR, fs.conf.FilePerms); err != nil { - return - } - - err = file.Flock(goceph.LockUN|goceph.LockNB, getHash(lock.AppName)) - }) - - if err == nil { - return fs.UnsetArbitraryMetadata(ctx, ref, []string{xattrLock}) - } - - return getRevaError(err) + return errtypes.NotSupported("unimplemented") } diff --git a/pkg/storage/fs/cephfs/chunking.go b/pkg/storage/fs/cephfs/chunking.go index 9ea8f37a14..9929834132 100644 --- a/pkg/storage/fs/cephfs/chunking.go +++ b/pkg/storage/fs/cephfs/chunking.go @@ -32,7 +32,7 @@ import ( "strings" "time" - goceph "github.com/ceph/go-ceph/cephfs" + cephfs2 "github.com/ceph/go-ceph/cephfs" "github.com/google/uuid" ) @@ -108,9 +108,7 @@ func (c *ChunkHandler) getChunkFolderName(i *ChunkBLOBInfo) (path string, err er } func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (finish bool, chunk string, err error) { - var chunkInfo *ChunkBLOBInfo - - chunkInfo, err = GetChunkBLOBInfo(path) + chunkInfo, err := GetChunkBLOBInfo(path) if err != nil { err = fmt.Errorf("error getting chunk info from path: %s", path) return @@ -118,7 +116,7 @@ func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (finish bool, chu chunkTempFilename := c.getChunkTempFileName() c.user.op(func(cv *cacheVal) { - var tmpFile *goceph.File + var tmpFile *cephfs2.File target := filepath.Join(c.chunkFolder, chunkTempFilename) tmpFile, err = cv.mount.Open(target, os.O_CREATE|os.O_WRONLY, c.user.fs.conf.FilePerms) defer closeFile(tmpFile) @@ -152,9 +150,9 @@ func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (finish bool, chu // assembly the chunks when the client asks for it. numEntries := 0 c.user.op(func(cv *cacheVal) { - var dir *goceph.Directory - var entry *goceph.DirEntry - var chunkFile, assembledFile *goceph.File + var dir *cephfs2.Directory + var entry *cephfs2.DirEntry + var chunkFile, assembledFile *cephfs2.File dir, err = cv.mount.OpenDir(chunksFolderName) defer closeDir(dir) @@ -223,122 +221,4 @@ func (c *ChunkHandler) WriteChunk(fn string, r io.ReadCloser) (string, string, e } return chunkInfo.Path, chunk, nil - - // TODO(labkode): implement old chunking - - /* - req2 := &provider.StartWriteSessionRequest{} - res2, err := client.StartWriteSession(ctx, req2) - if err != nil { - logger.Error(ctx, err) - w.WriteHeader(http.StatusInternalServerError) - return - } - - if res2.Status.Code != rpc.Code_CODE_OK { - logger.Println(ctx, res2.Status) - w.WriteHeader(http.StatusInternalServerError) - return - } - - sessID := res2.SessionId - logger.Build().Str("sessID", sessID).Msg(ctx, "got write session id") - - stream, err := client.Write(ctx) - if err != nil { - logger.Error(ctx, err) - w.WriteHeader(http.StatusInternalServerError) - return - } - - buffer := make([]byte, 1024*1024*3) - var offset uint64 - var numChunks uint64 - - for { - n, err := fd.Read(buffer) - if n > 0 { - req := &provider.WriteRequest{Data: buffer, Length: uint64(n), SessionId: sessID, Offset: offset} - err = stream.Send(req) - if err != nil { - logger.Error(ctx, err) - w.WriteHeader(http.StatusInternalServerError) - return - } - - numChunks++ - offset += uint64(n) - } - - if err == io.EOF { - break - } - - if err != nil { - logger.Error(ctx, err) - w.WriteHeader(http.StatusInternalServerError) - return - } - } - - res3, err := stream.CloseAndRecv() - if err != nil { - logger.Error(ctx, err) - w.WriteHeader(http.StatusInternalServerError) - return - } - - if res3.Status.Code != rpc.Code_CODE_OK { - logger.Println(ctx, err) - w.WriteHeader(http.StatusInternalServerError) - return - } - - req4 := &provider.FinishWriteSessionRequest{Filename: chunkInfo.path, SessionId: sessID} - res4, err := client.FinishWriteSession(ctx, req4) - if err != nil { - logger.Error(ctx, err) - w.WriteHeader(http.StatusInternalServerError) - return - } - - if res4.Status.Code != rpc.Code_CODE_OK { - logger.Println(ctx, res4.Status) - w.WriteHeader(http.StatusInternalServerError) - return - } - - req.Filename = chunkInfo.path - res, err = client.Stat(ctx, req) - if err != nil { - logger.Error(ctx, err) - w.WriteHeader(http.StatusInternalServerError) - return - } - - if res.Status.Code != rpc.Code_CODE_OK { - logger.Println(ctx, res.Status) - w.WriteHeader(http.StatusInternalServerError) - return - } - - md2 := res.Metadata - - w.Header().Add("Content-Type", md2.Mime) - w.Header().Set("ETag", md2.Etag) - w.Header().Set("OC-FileId", md2.Id) - w.Header().Set("OC-ETag", md2.Etag) - t := time.Unix(int64(md2.Mtime), 0) - lastModifiedString := t.Format(time.RFC1123Z) - w.Header().Set("Last-Modified", lastModifiedString) - w.Header().Set("X-OC-MTime", "accepted") - - if md == nil { - w.WriteHeader(http.StatusCreated) - return - } - - w.WriteHeader(http.StatusNoContent) - return - */ } diff --git a/pkg/storage/fs/cephfs/connections.go b/pkg/storage/fs/cephfs/connections.go index 117c39eade..8f35860eb1 100644 --- a/pkg/storage/fs/cephfs/connections.go +++ b/pkg/storage/fs/cephfs/connections.go @@ -34,14 +34,14 @@ import ( "github.com/cs3org/reva/pkg/rgrpc/todo/pool" "github.com/pkg/errors" - goceph "github.com/ceph/go-ceph/cephfs" + cephfs2 "github.com/ceph/go-ceph/cephfs" "github.com/dgraph-io/ristretto" "golang.org/x/sync/semaphore" ) type cacheVal struct { - perm *goceph.UserPerm - mount *goceph.MountInfo + perm *cephfs2.UserPerm + mount *cephfs2.MountInfo } //TODO: Add to cephfs obj @@ -63,10 +63,11 @@ func newCache() (c *connections, err error) { MaxCost: usrLimit, BufferItems: 64, OnEvict: func(item *ristretto.Item) { - v := item.Value.(cacheVal) - v.perm.Destroy() - _ = v.mount.Unmount() - _ = v.mount.Release() + if v, ok := item.Value.(*cacheVal); ok { + v.perm.Destroy() + _ = v.mount.Unmount() + _ = v.mount.Release() + } }, }) if err != nil { @@ -162,7 +163,7 @@ func newAdminConn(conf *Options) *adminConn { } */ - mount, err := goceph.CreateFromRados(rados) + mount, err := cephfs2.CreateFromRados(rados) if err != nil { rados.Shutdown() return nil @@ -184,8 +185,8 @@ func newAdminConn(conf *Options) *adminConn { } func newConn(user *User) *cacheVal { - var perm *goceph.UserPerm - mount, err := goceph.CreateMountWithId(user.fs.conf.ClientID) + var perm *cephfs2.UserPerm + mount, err := cephfs2.CreateMountWithId(user.fs.conf.ClientID) if err != nil { return destroyCephConn(mount, perm) } @@ -202,7 +203,7 @@ func newConn(user *User) *cacheVal { } if user != nil { //nil creates admin conn - perm = goceph.NewUserPerm(int(user.UidNumber), int(user.GidNumber), []int{}) + perm = cephfs2.NewUserPerm(int(user.UidNumber), int(user.GidNumber), []int{}) if err = mount.SetMountPerms(perm); err != nil { return destroyCephConn(mount, perm) } @@ -212,7 +213,7 @@ func newConn(user *User) *cacheVal { return destroyCephConn(mount, perm) } - if user != nil { + if user != nil && !user.fs.conf.DisableHome { if err = mount.ChangeDir(user.fs.conf.Root); err != nil { return destroyCephConn(mount, perm) } diff --git a/pkg/storage/fs/cephfs/errors.go b/pkg/storage/fs/cephfs/errors.go index 2e92575390..02f97bc1f5 100644 --- a/pkg/storage/fs/cephfs/errors.go +++ b/pkg/storage/fs/cephfs/errors.go @@ -51,7 +51,7 @@ func getRevaError(err error) error { } switch err.Error() { case errNotFound: - return errtypes.NotFound("cephfs: entry not found") + return errtypes.NotFound("cephfs: dir entry not found") case errPermissionDenied: return errtypes.PermissionDenied("cephfs: permission denied") case errFileExists: diff --git a/pkg/storage/fs/cephfs/permissions.go b/pkg/storage/fs/cephfs/permissions.go index e2d78de375..d9fc1ce9cc 100644 --- a/pkg/storage/fs/cephfs/permissions.go +++ b/pkg/storage/fs/cephfs/permissions.go @@ -29,7 +29,7 @@ import ( userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" - goceph "github.com/ceph/go-ceph/cephfs" + cephfs2 "github.com/ceph/go-ceph/cephfs" grouppb "github.com/cs3org/go-cs3apis/cs3/identity/group/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/maxymania/go-system/posix_acl" @@ -65,7 +65,7 @@ const ( var op2int = map[rune]uint16{'r': 4, 'w': 2, 'x': 1} -func getPermissionSet(user *User, stat *goceph.CephStatx, mount Mount, path string) (perm *provider.ResourcePermissions) { +func getPermissionSet(user *User, stat *cephfs2.CephStatx, mount Mount, path string) (perm *provider.ResourcePermissions) { perm = &provider.ResourcePermissions{} if int64(stat.Uid) == user.UidNumber || int64(stat.Gid) == user.GidNumber { diff --git a/pkg/storage/fs/cephfs/upload.go b/pkg/storage/fs/cephfs/upload.go index d27baf8377..2f330494a1 100644 --- a/pkg/storage/fs/cephfs/upload.go +++ b/pkg/storage/fs/cephfs/upload.go @@ -22,35 +22,19 @@ package cephfs import ( - "bytes" "context" - "encoding/json" "io" "os" - "path/filepath" - goceph "github.com/ceph/go-ceph/cephfs" - userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" - "github.com/cs3org/reva/pkg/appctx" - ctx2 "github.com/cs3org/reva/pkg/appctx" "github.com/cs3org/reva/pkg/errtypes" - "github.com/cs3org/reva/pkg/utils" - "github.com/google/uuid" "github.com/pkg/errors" - tusd "github.com/tus/tusd/pkg/handler" ) func (fs *cephfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser) error { user := fs.makeUser(ctx) - upload, err := fs.GetUpload(ctx, ref.GetPath()) - if err != nil { - return errors.Wrap(err, "cephfs: error retrieving upload") - } - - uploadInfo := upload.(*fileUpload) + p := ref.GetPath() - p := uploadInfo.info.Storage["InternalDestination"] ok, err := IsChunked(p) if err != nil { return errors.Wrap(err, "cephfs: error checking path") @@ -62,13 +46,8 @@ func (fs *cephfs) Upload(ctx context.Context, ref *provider.Reference, r io.Read return err } if p == "" { - if err = uploadInfo.Terminate(ctx); err != nil { - return errors.Wrap(err, "cephfs: error removing auxiliary files") - } return errtypes.PartialContent(ref.String()) } - uploadInfo.info.Storage["InternalDestination"] = p - user.op(func(cv *cacheVal) { r, err = cv.mount.Open(assembledFile, os.O_RDONLY, 0) }) @@ -81,318 +60,33 @@ func (fs *cephfs) Upload(ctx context.Context, ref *provider.Reference, r io.Read }) } - if _, err := uploadInfo.WriteChunk(ctx, 0, r); err != nil { - return errors.Wrap(err, "cephfs: error writing to binary file") - } - - return uploadInfo.FinishUpload(ctx) -} - -func (fs *cephfs) InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64, metadata map[string]string) (map[string]string, error) { - user := fs.makeUser(ctx) - np, err := user.resolveRef(ref) - if err != nil { - return nil, errors.Wrap(err, "cephfs: error resolving reference") - } - - info := tusd.FileInfo{ - MetaData: tusd.MetaData{ - "filename": filepath.Base(np), - "dir": filepath.Dir(np), - }, - Size: uploadLength, - } - - if metadata != nil { - if metadata["mtime"] != "" { - info.MetaData["mtime"] = metadata["mtime"] - } - if _, ok := metadata["sizedeferred"]; ok { - info.SizeIsDeferred = true - } - } - - upload, err := fs.NewUpload(ctx, info) - if err != nil { - return nil, err - } - - info, _ = upload.GetInfo(ctx) - - return map[string]string{ - "simple": info.ID, - "tus": info.ID, - }, nil -} - -// UseIn tells the tus upload middleware which extensions it supports. -func (fs *cephfs) UseIn(composer *tusd.StoreComposer) { - composer.UseCore(fs) - composer.UseTerminater(fs) -} - -func (fs *cephfs) NewUpload(ctx context.Context, info tusd.FileInfo) (upload tusd.Upload, err error) { - log := appctx.GetLogger(ctx) - log.Debug().Interface("info", info).Msg("cephfs: NewUpload") - - user := fs.makeUser(ctx) - - fn := info.MetaData["filename"] - if fn == "" { - return nil, errors.New("cephfs: missing filename in metadata") - } - info.MetaData["filename"] = filepath.Clean(info.MetaData["filename"]) - - dir := info.MetaData["dir"] - if dir == "" { - return nil, errors.New("cephfs: missing dir in metadata") - } - info.MetaData["dir"] = filepath.Clean(info.MetaData["dir"]) - - np := filepath.Join(info.MetaData["dir"], info.MetaData["filename"]) - - info.ID = uuid.New().String() - - binPath := fs.getUploadPath(info.ID) - - info.Storage = map[string]string{ - "Type": "Cephfs", - "BinPath": binPath, - "InternalDestination": np, - - "Idp": user.Id.Idp, - "UserId": user.Id.OpaqueId, - "UserName": user.Username, - "UserType": utils.UserTypeToString(user.Id.Type), - - "LogLevel": log.GetLevel().String(), - } - - // Create binary file with no content + var file io.WriteCloser user.op(func(cv *cacheVal) { - var f *goceph.File - defer closeFile(f) - f, err = cv.mount.Open(binPath, os.O_CREATE|os.O_WRONLY, fs.conf.FilePerms) + file, err = cv.mount.Open(p, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, fs.conf.FilePerms) if err != nil { + err = errors.Wrap(err, "cephfs: error opening binary file") return } - }) - //TODO: if we get two same upload ids, the second one can't upload at all - if err != nil { - return - } - - upload = &fileUpload{ - info: info, - binPath: binPath, - infoPath: binPath + ".info", - fs: fs, - ctx: ctx, - } - - // writeInfo creates the file by itself if necessary - err = upload.(*fileUpload).writeInfo() - - return -} - -func (fs *cephfs) getUploadPath(uploadID string) string { - return filepath.Join(fs.conf.UploadFolder, uploadID) -} - -// GetUpload returns the Upload for the given upload id -func (fs *cephfs) GetUpload(ctx context.Context, id string) (fup tusd.Upload, err error) { - binPath := fs.getUploadPath(id) - info := tusd.FileInfo{} - if err != nil { - return nil, errtypes.NotFound("bin path for upload " + id + " not found") - } - infoPath := binPath + ".info" - - var data bytes.Buffer - f, err := fs.adminConn.adminMount.Open(infoPath, os.O_RDONLY, 0) - if err != nil { - return - } - _, err = io.Copy(&data, f) - if err != nil { - return - } - if err = json.Unmarshal(data.Bytes(), &info); err != nil { - return - } - - u := &userpb.User{ - Id: &userpb.UserId{ - Idp: info.Storage["Idp"], - OpaqueId: info.Storage["UserId"], - }, - Username: info.Storage["UserName"], - } - ctx = ctx2.ContextSetUser(ctx, u) - user := fs.makeUser(ctx) - - var stat Statx - user.op(func(cv *cacheVal) { - stat, err = cv.mount.Statx(binPath, goceph.StatxSize, 0) - }) - if err != nil { - return - } - info.Offset = int64(stat.Size) - - return &fileUpload{ - info: info, - binPath: binPath, - infoPath: infoPath, - fs: fs, - ctx: ctx, - }, nil -} - -type fileUpload struct { - // info stores the current information about the upload - info tusd.FileInfo - // infoPath is the path to the .info file - infoPath string - // binPath is the path to the binary file (which has no extension) - binPath string - // only fs knows how to handle metadata and versions - fs *cephfs - // a context with a user - ctx context.Context -} - -// GetInfo returns the FileInfo -func (upload *fileUpload) GetInfo(ctx context.Context) (tusd.FileInfo, error) { - return upload.info, nil -} - -// GetReader returns an io.Reader for the upload -func (upload *fileUpload) GetReader(ctx context.Context) (file io.Reader, err error) { - user := upload.fs.makeUser(upload.ctx) - user.op(func(cv *cacheVal) { - file, err = cv.mount.Open(upload.binPath, os.O_RDONLY, 0) - }) - return -} - -// WriteChunk writes the stream from the reader to the given offset of the upload -func (upload *fileUpload) WriteChunk(ctx context.Context, offset int64, src io.Reader) (n int64, err error) { - var file io.WriteCloser - user := upload.fs.makeUser(upload.ctx) - user.op(func(cv *cacheVal) { - file, err = cv.mount.Open(upload.binPath, os.O_WRONLY|os.O_APPEND, 0) - }) - if err != nil { - return 0, err - } - defer file.Close() - - n, err = io.Copy(file, src) - - // If the HTTP PATCH request gets interrupted in the middle (e.g. because - // the user wants to pause the upload), Go's net/http returns an io.ErrUnexpectedEOF. - // However, for OwnCloudStore it's not important whether the stream has ended - // on purpose or accidentally. - if err != nil { - if err != io.ErrUnexpectedEOF { - return n, err - } - } - - upload.info.Offset += n - err = upload.writeInfo() - - return n, err -} - -// writeInfo updates the entire information. Everything will be overwritten. -func (upload *fileUpload) writeInfo() error { - data, err := json.Marshal(upload.info) + defer file.Close() - if err != nil { - return err - } - user := upload.fs.makeUser(upload.ctx) - user.op(func(cv *cacheVal) { - var file io.WriteCloser - if file, err = cv.mount.Open(upload.infoPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, upload.fs.conf.FilePerms); err != nil { + _, err = io.Copy(file, r) + if err != nil { + err = errors.Wrap(err, "cephfs: error writing to binary file") return } - defer file.Close() - - _, err = io.Copy(file, bytes.NewReader(data)) }) return err } -// FinishUpload finishes an upload and moves the file to the internal destination -func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { - - np := upload.info.Storage["InternalDestination"] - - // TODO check etag with If-Match header - // if destination exists - // if _, err := os.Stat(np); err == nil { - // the local storage does not store metadata - // the fileid is based on the path, so no we do not need to copy it to the new file - // the local storage does not track revisions - // } - - // if destination exists - // if _, err := os.Stat(np); err == nil { - // create revision - // if err := upload.fs.archiveRevision(upload.ctx, np); err != nil { - // return err - // } - // } - - user := upload.fs.makeUser(upload.ctx) - log := appctx.GetLogger(ctx) - - user.op(func(cv *cacheVal) { - err = cv.mount.Rename(upload.binPath, np) - }) - if err != nil { - return errors.Wrap(err, upload.binPath) - } - - // only delete the upload if it was successfully written to the fs - user.op(func(cv *cacheVal) { - err = cv.mount.Unlink(upload.infoPath) - }) +func (fs *cephfs) InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64, metadata map[string]string) (map[string]string, error) { + user := fs.makeUser(ctx) + np, err := user.resolveRef(ref) if err != nil { - if err.Error() != errNotFound { - log.Err(err).Interface("info", upload.info).Msg("cephfs: could not delete upload metadata") - } + return nil, errors.Wrap(err, "cephfs: error resolving reference") } - // TODO: set mtime if specified in metadata - - return -} - -// To implement the termination extension as specified in https://tus.io/protocols/resumable-upload.html#termination -// - the storage needs to implement AsTerminatableUpload -// - the upload needs to implement Terminate - -// AsTerminatableUpload returns a a TerminatableUpload -func (fs *cephfs) AsTerminatableUpload(upload tusd.Upload) tusd.TerminatableUpload { - return upload.(*fileUpload) -} - -// Terminate terminates the upload -func (upload *fileUpload) Terminate(ctx context.Context) (err error) { - user := upload.fs.makeUser(upload.ctx) - - user.op(func(cv *cacheVal) { - if err = cv.mount.Unlink(upload.infoPath); err != nil { - return - } - err = cv.mount.Unlink(upload.binPath) - }) - - return + return map[string]string{ + "simple": np, + }, nil } diff --git a/pkg/storage/fs/cephfs/user.go b/pkg/storage/fs/cephfs/user.go index 605148f4a2..cb016fbc99 100644 --- a/pkg/storage/fs/cephfs/user.go +++ b/pkg/storage/fs/cephfs/user.go @@ -31,11 +31,11 @@ import ( "github.com/cs3org/reva/pkg/errtypes" - goceph "github.com/ceph/go-ceph/cephfs" + cephfs2 "github.com/ceph/go-ceph/cephfs" userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" typesv1beta1 "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" - ctx2 "github.com/cs3org/reva/pkg/appctx" + ctx2 "github.com/cs3org/reva/pkg/ctx" "github.com/cs3org/reva/pkg/mime" "github.com/cs3org/reva/pkg/storage/utils/templates" "github.com/pkg/errors" @@ -92,7 +92,7 @@ func (user *User) op(cb callBack) { cb(val.(*cacheVal)) } -func (user *User) fileAsResourceInfo(cv *cacheVal, path string, stat *goceph.CephStatx, mdKeys []string) (ri *provider.ResourceInfo, err error) { +func (user *User) fileAsResourceInfo(cv *cacheVal, path string, stat *cephfs2.CephStatx, mdKeys []string) (ri *provider.ResourceInfo, err error) { var ( _type provider.ResourceType target string @@ -105,6 +105,9 @@ func (user *User) fileAsResourceInfo(cv *cacheVal, path string, stat *goceph.Cep _type = provider.ResourceType_RESOURCE_TYPE_CONTAINER if buf, err = cv.mount.GetXattr(path, "ceph.dir.rbytes"); err == nil { size, err = strconv.ParseUint(string(buf), 10, 64) + } else if err.Error() == errPermissionDenied { + // Ignore permission denied errors so ListFolder does not fail because of them. + err = nil } case syscall.S_IFLNK: _type = provider.ResourceType_RESOURCE_TYPE_SYMLINK @@ -116,10 +119,6 @@ func (user *User) fileAsResourceInfo(cv *cacheVal, path string, stat *goceph.Cep return nil, errors.New("cephfs: unknown entry type") } - if err != nil { - return - } - var xattrs []string keys := make(map[string]bool, len(mdKeys)) for _, key := range mdKeys { diff --git a/pkg/storage/fs/cephfs/utils.go b/pkg/storage/fs/cephfs/utils.go index b54c1f03ee..db0e27c759 100644 --- a/pkg/storage/fs/cephfs/utils.go +++ b/pkg/storage/fs/cephfs/utils.go @@ -30,33 +30,33 @@ import ( "path/filepath" "strconv" - goceph "github.com/ceph/go-ceph/cephfs" + cephfs2 "github.com/ceph/go-ceph/cephfs" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" ) // Mount type -type Mount = *goceph.MountInfo +type Mount = *cephfs2.MountInfo // Statx type -type Statx = *goceph.CephStatx +type Statx = *cephfs2.CephStatx var dirPermFull = uint32(0777) var dirPermDefault = uint32(0700) var filePermDefault = uint32(0640) -func closeDir(directory *goceph.Directory) { +func closeDir(directory *cephfs2.Directory) { if directory != nil { _ = directory.Close() } } -func closeFile(file *goceph.File) { +func closeFile(file *cephfs2.File) { if file != nil { _ = file.Close() } } -func destroyCephConn(mt Mount, perm *goceph.UserPerm) *cacheVal { +func destroyCephConn(mt Mount, perm *cephfs2.UserPerm) *cacheVal { if perm != nil { perm.Destroy() } @@ -66,7 +66,7 @@ func destroyCephConn(mt Mount, perm *goceph.UserPerm) *cacheVal { return nil } -func deleteFile(mount *goceph.MountInfo, path string) { +func deleteFile(mount *cephfs2.MountInfo, path string) { _ = mount.Unlink(path) } From 0bddc6e802889169e626cca2efefb069630c7df3 Mon Sep 17 00:00:00 2001 From: Javier Ferrer Date: Thu, 2 Nov 2023 12:10:38 +0100 Subject: [PATCH 2/5] Remove old context package --- pkg/storage/fs/cephfs/user.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/storage/fs/cephfs/user.go b/pkg/storage/fs/cephfs/user.go index cb016fbc99..0ab2fb163d 100644 --- a/pkg/storage/fs/cephfs/user.go +++ b/pkg/storage/fs/cephfs/user.go @@ -29,13 +29,13 @@ import ( "strings" "syscall" + "github.com/cs3org/reva/pkg/appctx" "github.com/cs3org/reva/pkg/errtypes" cephfs2 "github.com/ceph/go-ceph/cephfs" userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" typesv1beta1 "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" - ctx2 "github.com/cs3org/reva/pkg/ctx" "github.com/cs3org/reva/pkg/mime" "github.com/cs3org/reva/pkg/storage/utils/templates" "github.com/pkg/errors" @@ -52,7 +52,7 @@ type User struct { } func (fs *cephfs) makeUser(ctx context.Context) *User { - u := ctx2.ContextMustGetUser(ctx) + u := appctx.ContextMustGetUser(ctx) home := fs.conf.Root if !fs.conf.DisableHome { home = filepath.Join(fs.conf.Root, templates.WithUser(u, fs.conf.UserLayout)) From ebee345fd1044c32f2d1975720479a8a7846c01c Mon Sep 17 00:00:00 2001 From: Javier Ferrer Date: Thu, 2 Nov 2023 12:17:40 +0100 Subject: [PATCH 3/5] Rename ceph package --- pkg/storage/fs/cephfs/cephfs.go | 22 +++++++++++----------- pkg/storage/fs/cephfs/chunking.go | 10 +++++----- pkg/storage/fs/cephfs/connections.go | 14 +++++++------- pkg/storage/fs/cephfs/permissions.go | 4 ++-- pkg/storage/fs/cephfs/user.go | 4 ++-- pkg/storage/fs/cephfs/utils.go | 14 +++++++------- 6 files changed, 34 insertions(+), 34 deletions(-) diff --git a/pkg/storage/fs/cephfs/cephfs.go b/pkg/storage/fs/cephfs/cephfs.go index fc1828bc08..85dec8bc07 100644 --- a/pkg/storage/fs/cephfs/cephfs.go +++ b/pkg/storage/fs/cephfs/cephfs.go @@ -31,7 +31,7 @@ import ( "strconv" "strings" - cephfs2 "github.com/ceph/go-ceph/cephfs" + goceph "github.com/ceph/go-ceph/cephfs" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/cs3org/reva/pkg/appctx" "github.com/cs3org/reva/pkg/errtypes" @@ -81,7 +81,7 @@ func New(ctx context.Context, m map[string]interface{}) (fs storage.FS, err erro } for _, dir := range []string{o.ShadowFolder, o.UploadFolder} { - _, err := adminConn.adminMount.Statx(dir, cephfs2.StatxMask(cephfs2.StatxIno), 0) + _, err := adminConn.adminMount.Statx(dir, goceph.StatxMask(goceph.StatxIno), 0) if err != nil { err = adminConn.adminMount.MakeDir(dir, dirPermFull) if err != nil && err.Error() != errFileExists { @@ -115,7 +115,7 @@ func (fs *cephfs) CreateHome(ctx context.Context) (err error) { user := fs.makeUser(ctx) // Stop createhome from running the whole thing because it is called multiple times - if _, err = fs.adminConn.adminMount.Statx(user.home, cephfs2.StatxMode, 0); err == nil { + if _, err = fs.adminConn.adminMount.Statx(user.home, goceph.StatxMode, 0); err == nil { return } @@ -224,7 +224,7 @@ func (fs *cephfs) GetMD(ctx context.Context, ref *provider.Reference, mdKeys []s user.op(func(cv *cacheVal) { var stat Statx - if stat, err = cv.mount.Statx(path, cephfs2.StatxBasicStats, 0); err != nil { + if stat, err = cv.mount.Statx(path, goceph.StatxBasicStats, 0); err != nil { return } ri, err = user.fileAsResourceInfo(cv, path, stat, mdKeys) @@ -248,15 +248,15 @@ func (fs *cephfs) ListFolder(ctx context.Context, ref *provider.Reference, mdKey } user.op(func(cv *cacheVal) { - var dir *cephfs2.Directory + var dir *goceph.Directory if dir, err = cv.mount.OpenDir(path); err != nil { return } defer closeDir(dir) - var entry *cephfs2.DirEntryPlus + var entry *goceph.DirEntryPlus var ri *provider.ResourceInfo - for entry, err = dir.ReadDirPlus(cephfs2.StatxBasicStats, 0); entry != nil && err == nil; entry, err = dir.ReadDirPlus(cephfs2.StatxBasicStats, 0) { + for entry, err = dir.ReadDirPlus(goceph.StatxBasicStats, 0); entry != nil && err == nil; entry, err = dir.ReadDirPlus(goceph.StatxBasicStats, 0) { if fs.conf.HiddenDirs[entry.Name()] { continue } @@ -309,7 +309,7 @@ func (fs *cephfs) ListRevisions(ctx context.Context, ref *provider.Reference) (f err = errtypes.PermissionDenied("cephfs: cannot download under the virtual share folder") return } - var dir *cephfs2.Directory + var dir *goceph.Directory if dir, err = cv.mount.OpenDir(".snap"); err != nil { return } @@ -328,7 +328,7 @@ func (fs *cephfs) ListRevisions(ctx context.Context, ref *provider.Reference) (f if e != nil { continue } - stat, e = cv.mount.Statx(revPath, cephfs2.StatxMtime|cephfs2.StatxSize, 0) + stat, e = cv.mount.Statx(revPath, goceph.StatxMtime|goceph.StatxSize, 0) if e != nil { continue } @@ -375,7 +375,7 @@ func (fs *cephfs) RestoreRevision(ctx context.Context, ref *provider.Reference, return } - var src, dst *cephfs2.File + var src, dst *goceph.File if src, err = cv.mount.Open(revPath, os.O_RDONLY, 0); err != nil { return } @@ -578,7 +578,7 @@ func (fs *cephfs) TouchFile(ctx context.Context, ref *provider.Reference) error } user.op(func(cv *cacheVal) { - var file *cephfs2.File + var file *goceph.File defer closeFile(file) if file, err = cv.mount.Open(path, os.O_CREATE|os.O_WRONLY, fs.conf.FilePerms); err != nil { return diff --git a/pkg/storage/fs/cephfs/chunking.go b/pkg/storage/fs/cephfs/chunking.go index 9929834132..1f228821cb 100644 --- a/pkg/storage/fs/cephfs/chunking.go +++ b/pkg/storage/fs/cephfs/chunking.go @@ -32,7 +32,7 @@ import ( "strings" "time" - cephfs2 "github.com/ceph/go-ceph/cephfs" + goceph "github.com/ceph/go-ceph/cephfs" "github.com/google/uuid" ) @@ -116,7 +116,7 @@ func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (finish bool, chu chunkTempFilename := c.getChunkTempFileName() c.user.op(func(cv *cacheVal) { - var tmpFile *cephfs2.File + var tmpFile *goceph.File target := filepath.Join(c.chunkFolder, chunkTempFilename) tmpFile, err = cv.mount.Open(target, os.O_CREATE|os.O_WRONLY, c.user.fs.conf.FilePerms) defer closeFile(tmpFile) @@ -150,9 +150,9 @@ func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (finish bool, chu // assembly the chunks when the client asks for it. numEntries := 0 c.user.op(func(cv *cacheVal) { - var dir *cephfs2.Directory - var entry *cephfs2.DirEntry - var chunkFile, assembledFile *cephfs2.File + var dir *goceph.Directory + var entry *goceph.DirEntry + var chunkFile, assembledFile *goceph.File dir, err = cv.mount.OpenDir(chunksFolderName) defer closeDir(dir) diff --git a/pkg/storage/fs/cephfs/connections.go b/pkg/storage/fs/cephfs/connections.go index 8f35860eb1..c72fec6a76 100644 --- a/pkg/storage/fs/cephfs/connections.go +++ b/pkg/storage/fs/cephfs/connections.go @@ -34,14 +34,14 @@ import ( "github.com/cs3org/reva/pkg/rgrpc/todo/pool" "github.com/pkg/errors" - cephfs2 "github.com/ceph/go-ceph/cephfs" + goceph "github.com/ceph/go-ceph/cephfs" "github.com/dgraph-io/ristretto" "golang.org/x/sync/semaphore" ) type cacheVal struct { - perm *cephfs2.UserPerm - mount *cephfs2.MountInfo + perm *goceph.UserPerm + mount *goceph.MountInfo } //TODO: Add to cephfs obj @@ -163,7 +163,7 @@ func newAdminConn(conf *Options) *adminConn { } */ - mount, err := cephfs2.CreateFromRados(rados) + mount, err := goceph.CreateFromRados(rados) if err != nil { rados.Shutdown() return nil @@ -185,8 +185,8 @@ func newAdminConn(conf *Options) *adminConn { } func newConn(user *User) *cacheVal { - var perm *cephfs2.UserPerm - mount, err := cephfs2.CreateMountWithId(user.fs.conf.ClientID) + var perm *goceph.UserPerm + mount, err := goceph.CreateMountWithId(user.fs.conf.ClientID) if err != nil { return destroyCephConn(mount, perm) } @@ -203,7 +203,7 @@ func newConn(user *User) *cacheVal { } if user != nil { //nil creates admin conn - perm = cephfs2.NewUserPerm(int(user.UidNumber), int(user.GidNumber), []int{}) + perm = goceph.NewUserPerm(int(user.UidNumber), int(user.GidNumber), []int{}) if err = mount.SetMountPerms(perm); err != nil { return destroyCephConn(mount, perm) } diff --git a/pkg/storage/fs/cephfs/permissions.go b/pkg/storage/fs/cephfs/permissions.go index d9fc1ce9cc..e2d78de375 100644 --- a/pkg/storage/fs/cephfs/permissions.go +++ b/pkg/storage/fs/cephfs/permissions.go @@ -29,7 +29,7 @@ import ( userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" - cephfs2 "github.com/ceph/go-ceph/cephfs" + goceph "github.com/ceph/go-ceph/cephfs" grouppb "github.com/cs3org/go-cs3apis/cs3/identity/group/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/maxymania/go-system/posix_acl" @@ -65,7 +65,7 @@ const ( var op2int = map[rune]uint16{'r': 4, 'w': 2, 'x': 1} -func getPermissionSet(user *User, stat *cephfs2.CephStatx, mount Mount, path string) (perm *provider.ResourcePermissions) { +func getPermissionSet(user *User, stat *goceph.CephStatx, mount Mount, path string) (perm *provider.ResourcePermissions) { perm = &provider.ResourcePermissions{} if int64(stat.Uid) == user.UidNumber || int64(stat.Gid) == user.GidNumber { diff --git a/pkg/storage/fs/cephfs/user.go b/pkg/storage/fs/cephfs/user.go index 0ab2fb163d..4bc051ff2c 100644 --- a/pkg/storage/fs/cephfs/user.go +++ b/pkg/storage/fs/cephfs/user.go @@ -32,7 +32,7 @@ import ( "github.com/cs3org/reva/pkg/appctx" "github.com/cs3org/reva/pkg/errtypes" - cephfs2 "github.com/ceph/go-ceph/cephfs" + goceph "github.com/ceph/go-ceph/cephfs" userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" typesv1beta1 "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" @@ -92,7 +92,7 @@ func (user *User) op(cb callBack) { cb(val.(*cacheVal)) } -func (user *User) fileAsResourceInfo(cv *cacheVal, path string, stat *cephfs2.CephStatx, mdKeys []string) (ri *provider.ResourceInfo, err error) { +func (user *User) fileAsResourceInfo(cv *cacheVal, path string, stat *goceph.CephStatx, mdKeys []string) (ri *provider.ResourceInfo, err error) { var ( _type provider.ResourceType target string diff --git a/pkg/storage/fs/cephfs/utils.go b/pkg/storage/fs/cephfs/utils.go index db0e27c759..b54c1f03ee 100644 --- a/pkg/storage/fs/cephfs/utils.go +++ b/pkg/storage/fs/cephfs/utils.go @@ -30,33 +30,33 @@ import ( "path/filepath" "strconv" - cephfs2 "github.com/ceph/go-ceph/cephfs" + goceph "github.com/ceph/go-ceph/cephfs" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" ) // Mount type -type Mount = *cephfs2.MountInfo +type Mount = *goceph.MountInfo // Statx type -type Statx = *cephfs2.CephStatx +type Statx = *goceph.CephStatx var dirPermFull = uint32(0777) var dirPermDefault = uint32(0700) var filePermDefault = uint32(0640) -func closeDir(directory *cephfs2.Directory) { +func closeDir(directory *goceph.Directory) { if directory != nil { _ = directory.Close() } } -func closeFile(file *cephfs2.File) { +func closeFile(file *goceph.File) { if file != nil { _ = file.Close() } } -func destroyCephConn(mt Mount, perm *cephfs2.UserPerm) *cacheVal { +func destroyCephConn(mt Mount, perm *goceph.UserPerm) *cacheVal { if perm != nil { perm.Destroy() } @@ -66,7 +66,7 @@ func destroyCephConn(mt Mount, perm *cephfs2.UserPerm) *cacheVal { return nil } -func deleteFile(mount *cephfs2.MountInfo, path string) { +func deleteFile(mount *goceph.MountInfo, path string) { _ = mount.Unlink(path) } From 36e20b49a18af729d512ca0f7903326c3dd41300 Mon Sep 17 00:00:00 2001 From: Giuseppe Lo Presti Date: Thu, 26 Oct 2023 09:26:37 +0200 Subject: [PATCH 4/5] Implementation of CS3APIs Locks in the CephFS driver (#4280) * Refactoring * Implemented locking for CephFS * Minor alignments in the EOS driver * Cleanup * Alternate implementation of GetLock * Applied suggestions Co-authored-by: Javier Ferrer * Applied suggestions Co-authored-by: Javier Ferrer * Applied suggestions Co-authored-by: Javier Ferrer * Applied suggestions Co-authored-by: Javier Ferrer * Applied suggestions Co-authored-by: Javier Ferrer * Further fixes --------- Co-authored-by: Javier Ferrer --- pkg/storage/fs/cephfs/cephfs.go | 202 +++++++++++++++++++++++++++++++- pkg/storage/fs/cephfs/errors.go | 2 +- 2 files changed, 198 insertions(+), 6 deletions(-) diff --git a/pkg/storage/fs/cephfs/cephfs.go b/pkg/storage/fs/cephfs/cephfs.go index 85dec8bc07..1748cf7f14 100644 --- a/pkg/storage/fs/cephfs/cephfs.go +++ b/pkg/storage/fs/cephfs/cephfs.go @@ -23,13 +23,17 @@ package cephfs import ( "context" + b64 "encoding/base64" + "encoding/json" "fmt" + "hash/fnv" "io" "net/url" "os" "path/filepath" "strconv" "strings" + "time" goceph "github.com/ceph/go-ceph/cephfs" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" @@ -37,6 +41,7 @@ import ( "github.com/cs3org/reva/pkg/errtypes" "github.com/cs3org/reva/pkg/storage" "github.com/cs3org/reva/pkg/storage/fs/registry" + "github.com/cs3org/reva/pkg/utils" "github.com/cs3org/reva/pkg/utils/cfg" "github.com/pkg/errors" ) @@ -49,6 +54,7 @@ const ( xattrRef = xattrTrustedNs + "ref" xattrUserNs = "user." snap = ".snap" + xattrLock = xattrUserNs + "reva.lockpayload" ) type cephfs struct { @@ -618,18 +624,204 @@ func (fs *cephfs) UpdateStorageSpace(ctx context.Context, req *provider.UpdateSt return nil, errtypes.NotSupported("unimplemented") } +var fnvHash = fnv.New32a() + +func getHash(s string) uint64 { + fnvHash.Write([]byte(s)) + defer fnvHash.Reset() + return uint64(fnvHash.Sum32()) +} + +func encodeLock(l *provider.Lock) string { + data, _ := json.Marshal(l) + return b64.StdEncoding.EncodeToString(data) +} + +func decodeLock(content string) (*provider.Lock, error) { + d, err := b64.StdEncoding.DecodeString(content) + if err != nil { + return nil, err + } + + l := &provider.Lock{} + if err = json.Unmarshal(d, l); err != nil { + return nil, err + } + + return l, nil +} + func (fs *cephfs) SetLock(ctx context.Context, ref *provider.Reference, lock *provider.Lock) error { - return errtypes.NotSupported("unimplemented") + user := fs.makeUser(ctx) + path, err := user.resolveRef(ref) + if err != nil { + return getRevaError(err) + } + + op := goceph.LockEX + if lock.Type == provider.LockType_LOCK_TYPE_SHARED { + op = goceph.LockSH + } + + user.op(func(cv *cacheVal) { + var file *goceph.File + defer closeFile(file) + if file, err = cv.mount.Open(path, os.O_RDWR, fs.conf.FilePerms); err != nil { + return + } + + err = file.Flock(op|goceph.LockNB, getHash(lock.AppName)) + }) + + if err == nil { + // ok, we got the flock, now also store the related lock metadata + md := &provider.ArbitraryMetadata{ + Metadata: map[string]string{ + xattrLock: encodeLock(lock), + }, + } + return fs.SetArbitraryMetadata(ctx, ref, md) + } + + return getRevaError(err) } func (fs *cephfs) GetLock(ctx context.Context, ref *provider.Reference) (*provider.Lock, error) { - return nil, errtypes.NotSupported("unimplemented") + user := fs.makeUser(ctx) + path, err := user.resolveRef(ref) + if err != nil { + return nil, getRevaError(err) + } + + var l *provider.Lock + user.op(func(cv *cacheVal) { + buf, errXattr := cv.mount.GetXattr(path, xattrLock) + if errXattr == nil { + if l, err = decodeLock(string(buf)); err != nil { + err = errors.Wrap(err, "malformed lock payload") + return + } + } + + var file *goceph.File + defer closeFile(file) + if file, err = cv.mount.Open(path, os.O_RDWR, fs.conf.FilePerms); err != nil { + // try and open with read-only permissions: if this succeeds, we just return + // the metadata as is, otherwise we return the error on Open() + if file, err = cv.mount.Open(path, os.O_RDONLY, fs.conf.FilePerms); err != nil { + l = nil + } + return + } + + if err = file.Flock(goceph.LockEX|goceph.LockNB, 0); err == nil { + // success means the file was not locked, drop related metadata if present + if l != nil { + fs.UnsetArbitraryMetadata(ctx, ref, []string{xattrLock}) + l = nil + } + file.Flock(goceph.LockUN|goceph.LockNB, 0) + err = errtypes.NotFound("file was not locked") + return + } + + if errXattr != nil { + // error here means we have a "foreign" flock with no CS3 metadata + err = nil + l = new(provider.Lock) + l.AppName = "External" + return + } + + if time.Unix(int64(l.Expiration.Seconds), 0).After(time.Now()) { + // the lock expired, drop + fs.UnsetArbitraryMetadata(ctx, ref, []string{xattrLock}) + file.Flock(goceph.LockUN|goceph.LockNB, getHash(l.AppName)) + err = errtypes.NotFound("file was not locked") + l = nil + } + return + }) + + return l, getRevaError(err) } -func (fs *cephfs) RefreshLock(ctx context.Context, ref *provider.Reference, lock *provider.Lock, existingLockID string) error { - return errtypes.NotSupported("unimplemented") +// TODO(lopresti) part of this logic is duplicated from eosfs.go, should be factored out +func sameHolder(l1, l2 *provider.Lock) bool { + same := true + if l1.User != nil || l2.User != nil { + same = utils.UserEqual(l1.User, l2.User) + } + if l1.AppName != "" || l2.AppName != "" { + same = l1.AppName == l2.AppName + } + return same +} + +func (fs *cephfs) RefreshLock(ctx context.Context, ref *provider.Reference, newLock *provider.Lock, existingLockID string) error { + oldLock, err := fs.GetLock(ctx, ref) + if err != nil { + switch err.(type) { + case errtypes.NotFound: + // the lock does not exist + return errtypes.BadRequest("file was not locked") + default: + return err + } + } + + // check if the holder is the same of the new lock + if !sameHolder(oldLock, newLock) { + return errtypes.BadRequest("caller does not hold the lock") + } + + if existingLockID != "" && oldLock.LockId != existingLockID { + return errtypes.BadRequest("lock id does not match") + } + + return fs.SetLock(ctx, ref, newLock) } func (fs *cephfs) Unlock(ctx context.Context, ref *provider.Reference, lock *provider.Lock) error { - return errtypes.NotSupported("unimplemented") + user := fs.makeUser(ctx) + path, err := user.resolveRef(ref) + if err != nil { + return getRevaError(err) + } + + oldLock, err := fs.GetLock(ctx, ref) + if err != nil { + switch err.(type) { + case errtypes.NotFound: + // the lock does not exist + return errtypes.BadRequest("file not found or not locked") + default: + return err + } + } + + // check if the lock id of the lock corresponds to the stored lock + if oldLock.LockId != lock.LockId { + return errtypes.BadRequest("lock id does not match") + } + + if !sameHolder(oldLock, lock) { + return errtypes.BadRequest("caller does not hold the lock") + } + + user.op(func(cv *cacheVal) { + var file *goceph.File + defer closeFile(file) + if file, err = cv.mount.Open(path, os.O_RDWR, fs.conf.FilePerms); err != nil { + return + } + + err = file.Flock(goceph.LockUN|goceph.LockNB, getHash(lock.AppName)) + }) + + if err == nil { + return fs.UnsetArbitraryMetadata(ctx, ref, []string{xattrLock}) + } + + return getRevaError(err) } diff --git a/pkg/storage/fs/cephfs/errors.go b/pkg/storage/fs/cephfs/errors.go index 02f97bc1f5..2e92575390 100644 --- a/pkg/storage/fs/cephfs/errors.go +++ b/pkg/storage/fs/cephfs/errors.go @@ -51,7 +51,7 @@ func getRevaError(err error) error { } switch err.Error() { case errNotFound: - return errtypes.NotFound("cephfs: dir entry not found") + return errtypes.NotFound("cephfs: entry not found") case errPermissionDenied: return errtypes.PermissionDenied("cephfs: permission denied") case errFileExists: From d271e6eed7104a6ce2012c0081138917c2c2420d Mon Sep 17 00:00:00 2001 From: Javier Ferrer Date: Thu, 2 Nov 2023 12:31:58 +0100 Subject: [PATCH 5/5] Changelog --- changelog/unreleased/recover-ceph-driver.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/changelog/unreleased/recover-ceph-driver.md b/changelog/unreleased/recover-ceph-driver.md index 58d4dcef70..d4d5f70a20 100644 --- a/changelog/unreleased/recover-ceph-driver.md +++ b/changelog/unreleased/recover-ceph-driver.md @@ -1,6 +1,6 @@ Bugfix: Restore changes to ceph driver PR [4166](https://github.com/cs3org/reva/pull/4166) accidentally reverted the -ceph driver changes. This bugfix recovers them. +ceph driver changes. This PR recovers them. -https://github.com/cs3org/reva/pull/???? +https://github.com/cs3org/reva/pull/4310