diff --git a/changelog/unreleased/ceph-locks.md b/changelog/unreleased/ceph-locks.md new file mode 100644 index 0000000000..75e9be878f --- /dev/null +++ b/changelog/unreleased/ceph-locks.md @@ -0,0 +1,5 @@ +Enhancement: implementation of Locks for the CephFS driver + +This PR brings CS3APIs Locks for CephFS + +https://github.com/cs3org/reva/pull/4280 diff --git a/pkg/storage/fs/cephfs/cephfs.go b/pkg/storage/fs/cephfs/cephfs.go index e9e175aff4..bdc4bf73cb 100644 --- a/pkg/storage/fs/cephfs/cephfs.go +++ b/pkg/storage/fs/cephfs/cephfs.go @@ -23,20 +23,25 @@ package cephfs import ( "context" + b64 "encoding/base64" + "encoding/json" "fmt" + "hash/fnv" "io" "net/url" "os" "path/filepath" "strconv" "strings" + "time" - cephfs2 "github.com/ceph/go-ceph/cephfs" + goceph "github.com/ceph/go-ceph/cephfs" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/cs3org/reva/pkg/appctx" "github.com/cs3org/reva/pkg/errtypes" "github.com/cs3org/reva/pkg/storage" "github.com/cs3org/reva/pkg/storage/fs/registry" + "github.com/cs3org/reva/pkg/utils" "github.com/cs3org/reva/pkg/utils/cfg" "github.com/pkg/errors" ) @@ -49,6 +54,7 @@ const ( xattrRef = xattrTrustedNs + "ref" xattrUserNs = "user." snap = ".snap" + xattrLock = xattrUserNs + "reva.lockpayload" ) type cephfs struct { @@ -112,7 +118,7 @@ func (fs *cephfs) CreateHome(ctx context.Context) (err error) { user := fs.makeUser(ctx) // Stop createhome from running the whole thing because it is called multiple times - if _, err = fs.adminConn.adminMount.Statx(user.home, cephfs2.StatxMode, 0); err == nil { + if _, err = fs.adminConn.adminMount.Statx(user.home, goceph.StatxMode, 0); err == nil { return } @@ -221,7 +227,7 @@ func (fs *cephfs) GetMD(ctx context.Context, ref *provider.Reference, mdKeys []s user.op(func(cv *cacheVal) { var stat Statx - if stat, err = cv.mount.Statx(path, cephfs2.StatxBasicStats, 0); err != nil { + if stat, err = cv.mount.Statx(path, goceph.StatxBasicStats, 0); err != nil { return } ri, err = user.fileAsResourceInfo(cv, path, stat, mdKeys) @@ -245,15 +251,15 @@ func (fs *cephfs) ListFolder(ctx context.Context, ref *provider.Reference, mdKey } user.op(func(cv *cacheVal) { - var dir *cephfs2.Directory + var dir *goceph.Directory if dir, err = cv.mount.OpenDir(path); err != nil { return } defer closeDir(dir) - var entry *cephfs2.DirEntryPlus + var entry *goceph.DirEntryPlus var ri *provider.ResourceInfo - for entry, err = dir.ReadDirPlus(cephfs2.StatxBasicStats, 0); entry != nil && err == nil; entry, err = dir.ReadDirPlus(cephfs2.StatxBasicStats, 0) { + for entry, err = dir.ReadDirPlus(goceph.StatxBasicStats, 0); entry != nil && err == nil; entry, err = dir.ReadDirPlus(goceph.StatxBasicStats, 0) { if fs.conf.HiddenDirs[entry.Name()] { continue } @@ -306,7 +312,7 @@ func (fs *cephfs) ListRevisions(ctx context.Context, ref *provider.Reference) (f err = errtypes.PermissionDenied("cephfs: cannot download under the virtual share folder") return } - var dir *cephfs2.Directory + var dir *goceph.Directory if dir, err = cv.mount.OpenDir(".snap"); err != nil { return } @@ -325,7 +331,7 @@ func (fs *cephfs) ListRevisions(ctx context.Context, ref *provider.Reference) (f if e != nil { continue } - stat, e = cv.mount.Statx(revPath, cephfs2.StatxMtime|cephfs2.StatxSize, 0) + stat, e = cv.mount.Statx(revPath, goceph.StatxMtime|goceph.StatxSize, 0) if e != nil { continue } @@ -372,7 +378,7 @@ func (fs *cephfs) RestoreRevision(ctx context.Context, ref *provider.Reference, return } - var src, dst *cephfs2.File + var src, dst *goceph.File if src, err = cv.mount.Open(revPath, os.O_RDONLY, 0); err != nil { return } @@ -575,7 +581,7 @@ func (fs *cephfs) TouchFile(ctx context.Context, ref *provider.Reference) error } user.op(func(cv *cacheVal) { - var file *cephfs2.File + var file *goceph.File defer closeFile(file) if file, err = cv.mount.Open(path, os.O_CREATE|os.O_WRONLY, fs.conf.FilePerms); err != nil { return @@ -615,18 +621,204 @@ func (fs *cephfs) UpdateStorageSpace(ctx context.Context, req *provider.UpdateSt return nil, errtypes.NotSupported("unimplemented") } +var fnvHash = fnv.New32a() + +func getHash(s string) uint64 { + fnvHash.Write([]byte(s)) + defer fnvHash.Reset() + return uint64(fnvHash.Sum32()) +} + +func encodeLock(l *provider.Lock) string { + data, _ := json.Marshal(l) + return b64.StdEncoding.EncodeToString(data) +} + +func decodeLock(content string) (*provider.Lock, error) { + d, err := b64.StdEncoding.DecodeString(content) + if err != nil { + return nil, err + } + + l := &provider.Lock{} + if err = json.Unmarshal(d, l); err != nil { + return nil, err + } + + return l, nil +} + func (fs *cephfs) SetLock(ctx context.Context, ref *provider.Reference, lock *provider.Lock) error { - return errtypes.NotSupported("unimplemented") + user := fs.makeUser(ctx) + path, err := user.resolveRef(ref) + if err != nil { + return getRevaError(err) + } + + op := goceph.LockEX + if lock.Type == provider.LockType_LOCK_TYPE_SHARED { + op = goceph.LockSH + } + + user.op(func(cv *cacheVal) { + var file *goceph.File + defer closeFile(file) + if file, err = cv.mount.Open(path, os.O_RDWR, fs.conf.FilePerms); err != nil { + return + } + + err = file.Flock(op|goceph.LockNB, getHash(lock.AppName)) + }) + + if err == nil { + // ok, we got the flock, now also store the related lock metadata + md := &provider.ArbitraryMetadata{ + Metadata: map[string]string{ + xattrLock: encodeLock(lock), + }, + } + return fs.SetArbitraryMetadata(ctx, ref, md) + } + + return getRevaError(err) } func (fs *cephfs) GetLock(ctx context.Context, ref *provider.Reference) (*provider.Lock, error) { - return nil, errtypes.NotSupported("unimplemented") + user := fs.makeUser(ctx) + path, err := user.resolveRef(ref) + if err != nil { + return nil, getRevaError(err) + } + + var l *provider.Lock + user.op(func(cv *cacheVal) { + buf, errXattr := cv.mount.GetXattr(path, xattrLock) + if errXattr == nil { + if l, err = decodeLock(string(buf)); err != nil { + err = errors.Wrap(err, "malformed lock payload") + return + } + } + + var file *goceph.File + defer closeFile(file) + if file, err = cv.mount.Open(path, os.O_RDWR, fs.conf.FilePerms); err != nil { + // try and open with read-only permissions: if this succeeds, we just return + // the metadata as is, otherwise we return the error on Open() + if file, err = cv.mount.Open(path, os.O_RDONLY, fs.conf.FilePerms); err != nil { + l = nil + } + return + } + + if err = file.Flock(goceph.LockEX|goceph.LockNB, 0); err == nil { + // success means the file was not locked, drop related metadata if present + if l != nil { + fs.UnsetArbitraryMetadata(ctx, ref, []string{xattrLock}) + l = nil + } + file.Flock(goceph.LockUN|goceph.LockNB, 0) + err = errtypes.NotFound("file was not locked") + return + } + + if errXattr != nil { + // error here means we have a "foreign" flock with no CS3 metadata + err = nil + l = new(provider.Lock) + l.AppName = "External" + return + } + + if time.Unix(int64(l.Expiration.Seconds), 0).After(time.Now()) { + // the lock expired, drop + fs.UnsetArbitraryMetadata(ctx, ref, []string{xattrLock}) + file.Flock(goceph.LockUN|goceph.LockNB, getHash(l.AppName)) + err = errtypes.NotFound("file was not locked") + l = nil + } + return + }) + + return l, getRevaError(err) } -func (fs *cephfs) RefreshLock(ctx context.Context, ref *provider.Reference, lock *provider.Lock, existingLockID string) error { - return errtypes.NotSupported("unimplemented") +// TODO(lopresti) part of this logic is duplicated from eosfs.go, should be factored out +func sameHolder(l1, l2 *provider.Lock) bool { + same := true + if l1.User != nil || l2.User != nil { + same = utils.UserEqual(l1.User, l2.User) + } + if l1.AppName != "" || l2.AppName != "" { + same = l1.AppName == l2.AppName + } + return same +} + +func (fs *cephfs) RefreshLock(ctx context.Context, ref *provider.Reference, newLock *provider.Lock, existingLockID string) error { + oldLock, err := fs.GetLock(ctx, ref) + if err != nil { + switch err.(type) { + case errtypes.NotFound: + // the lock does not exist + return errtypes.BadRequest("file was not locked") + default: + return err + } + } + + // check if the holder is the same of the new lock + if !sameHolder(oldLock, newLock) { + return errtypes.BadRequest("caller does not hold the lock") + } + + if existingLockID != "" && oldLock.LockId != existingLockID { + return errtypes.BadRequest("lock id does not match") + } + + return fs.SetLock(ctx, ref, newLock) } func (fs *cephfs) Unlock(ctx context.Context, ref *provider.Reference, lock *provider.Lock) error { - return errtypes.NotSupported("unimplemented") + user := fs.makeUser(ctx) + path, err := user.resolveRef(ref) + if err != nil { + return getRevaError(err) + } + + oldLock, err := fs.GetLock(ctx, ref) + if err != nil { + switch err.(type) { + case errtypes.NotFound: + // the lock does not exist + return errtypes.BadRequest("file not found or not locked") + default: + return err + } + } + + // check if the lock id of the lock corresponds to the stored lock + if oldLock.LockId != lock.LockId { + return errtypes.BadRequest("lock id does not match") + } + + if !sameHolder(oldLock, lock) { + return errtypes.BadRequest("caller does not hold the lock") + } + + user.op(func(cv *cacheVal) { + var file *goceph.File + defer closeFile(file) + if file, err = cv.mount.Open(path, os.O_RDWR, fs.conf.FilePerms); err != nil { + return + } + + err = file.Flock(goceph.LockUN|goceph.LockNB, getHash(lock.AppName)) + }) + + if err == nil { + return fs.UnsetArbitraryMetadata(ctx, ref, []string{xattrLock}) + } + + return getRevaError(err) } diff --git a/pkg/storage/fs/cephfs/chunking.go b/pkg/storage/fs/cephfs/chunking.go index fc9dbe0bbd..9ea8f37a14 100644 --- a/pkg/storage/fs/cephfs/chunking.go +++ b/pkg/storage/fs/cephfs/chunking.go @@ -32,7 +32,7 @@ import ( "strings" "time" - cephfs2 "github.com/ceph/go-ceph/cephfs" + goceph "github.com/ceph/go-ceph/cephfs" "github.com/google/uuid" ) @@ -118,7 +118,7 @@ func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (finish bool, chu chunkTempFilename := c.getChunkTempFileName() c.user.op(func(cv *cacheVal) { - var tmpFile *cephfs2.File + var tmpFile *goceph.File target := filepath.Join(c.chunkFolder, chunkTempFilename) tmpFile, err = cv.mount.Open(target, os.O_CREATE|os.O_WRONLY, c.user.fs.conf.FilePerms) defer closeFile(tmpFile) @@ -152,9 +152,9 @@ func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (finish bool, chu // assembly the chunks when the client asks for it. numEntries := 0 c.user.op(func(cv *cacheVal) { - var dir *cephfs2.Directory - var entry *cephfs2.DirEntry - var chunkFile, assembledFile *cephfs2.File + var dir *goceph.Directory + var entry *goceph.DirEntry + var chunkFile, assembledFile *goceph.File dir, err = cv.mount.OpenDir(chunksFolderName) defer closeDir(dir) diff --git a/pkg/storage/fs/cephfs/connections.go b/pkg/storage/fs/cephfs/connections.go index 6ffb0dbb7f..117c39eade 100644 --- a/pkg/storage/fs/cephfs/connections.go +++ b/pkg/storage/fs/cephfs/connections.go @@ -34,14 +34,14 @@ import ( "github.com/cs3org/reva/pkg/rgrpc/todo/pool" "github.com/pkg/errors" - cephfs2 "github.com/ceph/go-ceph/cephfs" + goceph "github.com/ceph/go-ceph/cephfs" "github.com/dgraph-io/ristretto" "golang.org/x/sync/semaphore" ) type cacheVal struct { - perm *cephfs2.UserPerm - mount *cephfs2.MountInfo + perm *goceph.UserPerm + mount *goceph.MountInfo } //TODO: Add to cephfs obj @@ -162,7 +162,7 @@ func newAdminConn(conf *Options) *adminConn { } */ - mount, err := cephfs2.CreateFromRados(rados) + mount, err := goceph.CreateFromRados(rados) if err != nil { rados.Shutdown() return nil @@ -184,8 +184,8 @@ func newAdminConn(conf *Options) *adminConn { } func newConn(user *User) *cacheVal { - var perm *cephfs2.UserPerm - mount, err := cephfs2.CreateMountWithId(user.fs.conf.ClientID) + var perm *goceph.UserPerm + mount, err := goceph.CreateMountWithId(user.fs.conf.ClientID) if err != nil { return destroyCephConn(mount, perm) } @@ -202,7 +202,7 @@ func newConn(user *User) *cacheVal { } if user != nil { //nil creates admin conn - perm = cephfs2.NewUserPerm(int(user.UidNumber), int(user.GidNumber), []int{}) + perm = goceph.NewUserPerm(int(user.UidNumber), int(user.GidNumber), []int{}) if err = mount.SetMountPerms(perm); err != nil { return destroyCephConn(mount, perm) } diff --git a/pkg/storage/fs/cephfs/errors.go b/pkg/storage/fs/cephfs/errors.go index 02f97bc1f5..2e92575390 100644 --- a/pkg/storage/fs/cephfs/errors.go +++ b/pkg/storage/fs/cephfs/errors.go @@ -51,7 +51,7 @@ func getRevaError(err error) error { } switch err.Error() { case errNotFound: - return errtypes.NotFound("cephfs: dir entry not found") + return errtypes.NotFound("cephfs: entry not found") case errPermissionDenied: return errtypes.PermissionDenied("cephfs: permission denied") case errFileExists: diff --git a/pkg/storage/fs/cephfs/permissions.go b/pkg/storage/fs/cephfs/permissions.go index d9fc1ce9cc..e2d78de375 100644 --- a/pkg/storage/fs/cephfs/permissions.go +++ b/pkg/storage/fs/cephfs/permissions.go @@ -29,7 +29,7 @@ import ( userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" - cephfs2 "github.com/ceph/go-ceph/cephfs" + goceph "github.com/ceph/go-ceph/cephfs" grouppb "github.com/cs3org/go-cs3apis/cs3/identity/group/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/maxymania/go-system/posix_acl" @@ -65,7 +65,7 @@ const ( var op2int = map[rune]uint16{'r': 4, 'w': 2, 'x': 1} -func getPermissionSet(user *User, stat *cephfs2.CephStatx, mount Mount, path string) (perm *provider.ResourcePermissions) { +func getPermissionSet(user *User, stat *goceph.CephStatx, mount Mount, path string) (perm *provider.ResourcePermissions) { perm = &provider.ResourcePermissions{} if int64(stat.Uid) == user.UidNumber || int64(stat.Gid) == user.GidNumber { diff --git a/pkg/storage/fs/cephfs/upload.go b/pkg/storage/fs/cephfs/upload.go index 0f340e7515..d27baf8377 100644 --- a/pkg/storage/fs/cephfs/upload.go +++ b/pkg/storage/fs/cephfs/upload.go @@ -29,7 +29,7 @@ import ( "os" "path/filepath" - cephfs2 "github.com/ceph/go-ceph/cephfs" + goceph "github.com/ceph/go-ceph/cephfs" userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/cs3org/reva/pkg/appctx" @@ -170,7 +170,7 @@ func (fs *cephfs) NewUpload(ctx context.Context, info tusd.FileInfo) (upload tus // Create binary file with no content user.op(func(cv *cacheVal) { - var f *cephfs2.File + var f *goceph.File defer closeFile(f) f, err = cv.mount.Open(binPath, os.O_CREATE|os.O_WRONLY, fs.conf.FilePerms) if err != nil { @@ -234,7 +234,7 @@ func (fs *cephfs) GetUpload(ctx context.Context, id string) (fup tusd.Upload, er var stat Statx user.op(func(cv *cacheVal) { - stat, err = cv.mount.Statx(binPath, cephfs2.StatxSize, 0) + stat, err = cv.mount.Statx(binPath, goceph.StatxSize, 0) }) if err != nil { return diff --git a/pkg/storage/fs/cephfs/user.go b/pkg/storage/fs/cephfs/user.go index aa5a2723b2..605148f4a2 100644 --- a/pkg/storage/fs/cephfs/user.go +++ b/pkg/storage/fs/cephfs/user.go @@ -31,7 +31,7 @@ import ( "github.com/cs3org/reva/pkg/errtypes" - cephfs2 "github.com/ceph/go-ceph/cephfs" + goceph "github.com/ceph/go-ceph/cephfs" userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" typesv1beta1 "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" @@ -92,7 +92,7 @@ func (user *User) op(cb callBack) { cb(val.(*cacheVal)) } -func (user *User) fileAsResourceInfo(cv *cacheVal, path string, stat *cephfs2.CephStatx, mdKeys []string) (ri *provider.ResourceInfo, err error) { +func (user *User) fileAsResourceInfo(cv *cacheVal, path string, stat *goceph.CephStatx, mdKeys []string) (ri *provider.ResourceInfo, err error) { var ( _type provider.ResourceType target string diff --git a/pkg/storage/fs/cephfs/utils.go b/pkg/storage/fs/cephfs/utils.go index db0e27c759..b54c1f03ee 100644 --- a/pkg/storage/fs/cephfs/utils.go +++ b/pkg/storage/fs/cephfs/utils.go @@ -30,33 +30,33 @@ import ( "path/filepath" "strconv" - cephfs2 "github.com/ceph/go-ceph/cephfs" + goceph "github.com/ceph/go-ceph/cephfs" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" ) // Mount type -type Mount = *cephfs2.MountInfo +type Mount = *goceph.MountInfo // Statx type -type Statx = *cephfs2.CephStatx +type Statx = *goceph.CephStatx var dirPermFull = uint32(0777) var dirPermDefault = uint32(0700) var filePermDefault = uint32(0640) -func closeDir(directory *cephfs2.Directory) { +func closeDir(directory *goceph.Directory) { if directory != nil { _ = directory.Close() } } -func closeFile(file *cephfs2.File) { +func closeFile(file *goceph.File) { if file != nil { _ = file.Close() } } -func destroyCephConn(mt Mount, perm *cephfs2.UserPerm) *cacheVal { +func destroyCephConn(mt Mount, perm *goceph.UserPerm) *cacheVal { if perm != nil { perm.Destroy() } @@ -66,7 +66,7 @@ func destroyCephConn(mt Mount, perm *cephfs2.UserPerm) *cacheVal { return nil } -func deleteFile(mount *cephfs2.MountInfo, path string) { +func deleteFile(mount *goceph.MountInfo, path string) { _ = mount.Unlink(path) } diff --git a/pkg/storage/utils/eosfs/eosfs.go b/pkg/storage/utils/eosfs/eosfs.go index bb5ade3f46..b9b102c2af 100644 --- a/pkg/storage/utils/eosfs/eosfs.go +++ b/pkg/storage/utils/eosfs/eosfs.go @@ -77,7 +77,7 @@ const ( const EosLockKey = "app.lock" // LockPayloadKey is the key in the xattrs used to store the lock payload. -const LockPayloadKey = "reva.lock.payload" +const LockPayloadKey = "reva.lockpayload" var hiddenReg = regexp.MustCompile(`\.sys\..#.`) @@ -871,7 +871,7 @@ func (fs *eosfs) RefreshLock(ctx context.Context, ref *provider.Reference, newLo } if existingLockID != "" && oldLock.LockId != existingLockID { - return errtypes.BadRequest("mismatching existing lock id") + return errtypes.BadRequest("lock id does not match") } path, err := fs.resolve(ctx, ref) @@ -887,7 +887,7 @@ func (fs *eosfs) RefreshLock(ctx context.Context, ref *provider.Reference, newLo return errors.Wrap(err, "eosfs: cannot check if user has write access on resource") } if !has { - return errtypes.PermissionDenied(fmt.Sprintf("user %s has not write access on resource", user.Username)) + return errtypes.PermissionDenied(fmt.Sprintf("user %s has no write access on resource", user.Username)) } return fs.setLock(ctx, newLock, path) @@ -906,10 +906,6 @@ func sameHolder(l1, l2 *provider.Lock) bool { // Unlock removes an existing lock from the given reference. func (fs *eosfs) Unlock(ctx context.Context, ref *provider.Reference, lock *provider.Lock) error { - if lock.Type == provider.LockType_LOCK_TYPE_SHARED { - return errtypes.NotSupported("shared lock not yet implemented") - } - oldLock, err := fs.GetLock(ctx, ref) if err != nil { switch err.(type) { @@ -942,7 +938,7 @@ func (fs *eosfs) Unlock(ctx context.Context, ref *provider.Reference, lock *prov return errors.Wrap(err, "eosfs: cannot check if user has write access on resource") } if !has { - return errtypes.PermissionDenied(fmt.Sprintf("user %s has not write access on resource", user.Username)) + return errtypes.PermissionDenied(fmt.Sprintf("user %s has no write access on resource", user.Username)) } path, err := fs.resolve(ctx, ref)