diff --git a/changelog/unreleased/ceph-locks.md b/changelog/unreleased/ceph-locks.md new file mode 100644 index 00000000000..75e9be878fe --- /dev/null +++ b/changelog/unreleased/ceph-locks.md @@ -0,0 +1,5 @@ +Enhancement: implementation of Locks for the CephFS driver + +This PR brings CS3APIs Locks for CephFS + +https://github.com/cs3org/reva/pull/4280 diff --git a/pkg/storage/fs/cephfs/cephfs.go b/pkg/storage/fs/cephfs/cephfs.go index 7329a7252eb..7a05ecca700 100644 --- a/pkg/storage/fs/cephfs/cephfs.go +++ b/pkg/storage/fs/cephfs/cephfs.go @@ -22,14 +22,21 @@ package cephfs import ( + "bytes" "context" + b64 "encoding/base64" + "encoding/binary" + "encoding/json" "fmt" + "hash" + "hash/fnv" "io" "net/url" "os" "path/filepath" "strconv" "strings" + "time" goceph "github.com/ceph/go-ceph/cephfs" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" @@ -37,6 +44,7 @@ import ( "github.com/cs3org/reva/pkg/errtypes" "github.com/cs3org/reva/pkg/storage" "github.com/cs3org/reva/pkg/storage/fs/registry" + "github.com/cs3org/reva/pkg/utils" "github.com/cs3org/reva/pkg/utils/cfg" "github.com/pkg/errors" ) @@ -49,6 +57,7 @@ const ( xattrRef = xattrTrustedNs + "ref" xattrUserNs = "user." snap = ".snap" + xattrLock = xattrUserNs + "reva.lockpayload" ) type cephfs struct { @@ -615,18 +624,205 @@ func (fs *cephfs) UpdateStorageSpace(ctx context.Context, req *provider.UpdateSt return nil, errtypes.NotSupported("unimplemented") } +var fnvHash hash.Hash32 = fnv.New32a() + +func getHash(s string) uint64 { + fnvHash.Write([]byte(s)) + defer fnvHash.Reset() + buf := bytes.NewReader(fnvHash.Sum(nil)) + var res uint32 + binary.Read(buf, binary.BigEndian, &res) + return uint64(res) + // h := fnvHash.Sum(nil) + // return uint64(h[0]) + uint64(h[1])*uint64(2<<8) + uint64(h[2])*uint64(2<<16) + uint64(h[3])*uint64(2<<24) +} + +func encodeLock(l *provider.Lock) string { + data, _ := json.Marshal(l) + return b64.StdEncoding.EncodeToString(data) +} + +func decodeLock(content string) (*provider.Lock, error) { + d, err := b64.StdEncoding.DecodeString(content) + if err != nil { + return nil, err + } + + l := new(provider.Lock) + err = json.Unmarshal(d, l) + if err != nil { + return nil, err + } + + return l, nil +} + +// TODO(lopresti) part of this logic is duplicated from eosfs.go, should be factored out func (fs *cephfs) SetLock(ctx context.Context, ref *provider.Reference, lock *provider.Lock) error { - return errtypes.NotSupported("unimplemented") + user := fs.makeUser(ctx) + path, err := user.resolveRef(ref) + if err != nil { + return getRevaError(err) + } + + op := goceph.LockEX + if lock.Type == provider.LockType_LOCK_TYPE_SHARED { + op = goceph.LockSH + } + + user.op(func(cv *cacheVal) { + var file *goceph.File + defer closeFile(file) + if file, err = cv.mount.Open(path, os.O_RDWR, fs.conf.FilePerms); err != nil { + return + } + + if err = file.Flock(op|goceph.LockNB, getHash(lock.AppName)); err != nil { + // already locked? + return + } + }) + + if err == nil { + md := &provider.ArbitraryMetadata{ + Metadata: map[string]string{ + xattrLock: encodeLock(lock), + }, + } + return fs.SetArbitraryMetadata(ctx, ref, md) + } + + return getRevaError(err) } func (fs *cephfs) GetLock(ctx context.Context, ref *provider.Reference) (*provider.Lock, error) { - return nil, errtypes.NotSupported("unimplemented") + user := fs.makeUser(ctx) + path, err := user.resolveRef(ref) + if err != nil { + return nil, getRevaError(err) + } + + var l *provider.Lock + user.op(func(cv *cacheVal) { + var file *goceph.File + defer closeFile(file) + // TODO(lopresti) O_RDONLY should be enough, but we want to try and grab a lock to test if a lock existed + if file, err = cv.mount.Open(path, os.O_RDWR, fs.conf.FilePerms); err != nil { + return + } + + if err = file.Flock(goceph.LockEX|goceph.LockNB, 0); err == nil { + // success means file was not locked + file.Flock(goceph.LockUN|goceph.LockNB, 0) + err = errtypes.NotFound("file was not locked") + return + } + + buf, err := cv.mount.GetXattr(path, xattrLock) + if err != nil { + // error here means we have a "foreign" flock with no CS3 metadata + err = nil + l = new(provider.Lock) + l.AppName = "External" + return + } + + if l, err = decodeLock(string(buf)); err != nil { + l = nil + err = errors.Wrap(err, "cephfs: malformed lock payload") + return + } + + if time.Unix(int64(l.Expiration.Seconds), 0).After(time.Now()) { + // the lock expired, drop + fs.UnsetArbitraryMetadata(ctx, ref, []string{xattrLock}) + file.Flock(goceph.LockUN|goceph.LockNB, getHash(l.AppName)) + err = errtypes.NotFound("file was not locked") + l = nil + } + return + }) + + return l, getRevaError(err) } -func (fs *cephfs) RefreshLock(ctx context.Context, ref *provider.Reference, lock *provider.Lock, existingLockID string) error { - return errtypes.NotSupported("unimplemented") +func sameHolder(l1, l2 *provider.Lock) bool { + same := true + if l1.User != nil || l2.User != nil { + same = utils.UserEqual(l1.User, l2.User) + } + if l1.AppName != "" || l2.AppName != "" { + same = l1.AppName == l2.AppName + } + return same +} + +func (fs *cephfs) RefreshLock(ctx context.Context, ref *provider.Reference, newLock *provider.Lock, existingLockID string) error { + oldLock, err := fs.GetLock(ctx, ref) + if err != nil { + switch err.(type) { + case errtypes.NotFound: + // the lock does not exist + return errtypes.BadRequest("file was not locked") + default: + return err + } + } + + // check if the holder is the same of the new lock + if !sameHolder(oldLock, newLock) { + return errtypes.BadRequest("caller does not hold the lock") + } + + if existingLockID != "" && oldLock.LockId != existingLockID { + return errtypes.BadRequest("mismatching existing lock id") + } + + return fs.SetLock(ctx, ref, newLock) } func (fs *cephfs) Unlock(ctx context.Context, ref *provider.Reference, lock *provider.Lock) error { - return errtypes.NotSupported("unimplemented") + user := fs.makeUser(ctx) + path, err := user.resolveRef(ref) + if err != nil { + return getRevaError(err) + } + + oldLock, err := fs.GetLock(ctx, ref) + if err != nil { + switch err.(type) { + case errtypes.NotFound: + // the lock does not exist + return errtypes.BadRequest("file not found or not locked") + default: + return err + } + } + + // check if the lock id of the lock corresponds to the stored lock + if oldLock.LockId != lock.LockId { + return errtypes.BadRequest("lock id does not match") + } + + if !sameHolder(oldLock, lock) { + return errtypes.BadRequest("caller does not hold the lock") + } + + user.op(func(cv *cacheVal) { + var file *goceph.File + defer closeFile(file) + if file, err = cv.mount.Open(path, os.O_RDWR, fs.conf.FilePerms); err != nil { + return + } + + if err = file.Flock(goceph.LockUN|goceph.LockNB, getHash(lock.AppName)); err != nil { + return + } + }) + + if err == nil { + return fs.UnsetArbitraryMetadata(ctx, ref, []string{xattrLock}) + } + + return getRevaError(err) } diff --git a/pkg/storage/fs/cephfs/errors.go b/pkg/storage/fs/cephfs/errors.go index 02f97bc1f5a..2e92575390a 100644 --- a/pkg/storage/fs/cephfs/errors.go +++ b/pkg/storage/fs/cephfs/errors.go @@ -51,7 +51,7 @@ func getRevaError(err error) error { } switch err.Error() { case errNotFound: - return errtypes.NotFound("cephfs: dir entry not found") + return errtypes.NotFound("cephfs: entry not found") case errPermissionDenied: return errtypes.PermissionDenied("cephfs: permission denied") case errFileExists: diff --git a/pkg/storage/utils/eosfs/eosfs.go b/pkg/storage/utils/eosfs/eosfs.go index bb5ade3f46f..da65a3545eb 100644 --- a/pkg/storage/utils/eosfs/eosfs.go +++ b/pkg/storage/utils/eosfs/eosfs.go @@ -77,7 +77,7 @@ const ( const EosLockKey = "app.lock" // LockPayloadKey is the key in the xattrs used to store the lock payload. -const LockPayloadKey = "reva.lock.payload" +const LockPayloadKey = "reva.lockpayload" var hiddenReg = regexp.MustCompile(`\.sys\..#.`) @@ -887,7 +887,7 @@ func (fs *eosfs) RefreshLock(ctx context.Context, ref *provider.Reference, newLo return errors.Wrap(err, "eosfs: cannot check if user has write access on resource") } if !has { - return errtypes.PermissionDenied(fmt.Sprintf("user %s has not write access on resource", user.Username)) + return errtypes.PermissionDenied(fmt.Sprintf("user %s has no write access on resource", user.Username)) } return fs.setLock(ctx, newLock, path) @@ -906,10 +906,6 @@ func sameHolder(l1, l2 *provider.Lock) bool { // Unlock removes an existing lock from the given reference. func (fs *eosfs) Unlock(ctx context.Context, ref *provider.Reference, lock *provider.Lock) error { - if lock.Type == provider.LockType_LOCK_TYPE_SHARED { - return errtypes.NotSupported("shared lock not yet implemented") - } - oldLock, err := fs.GetLock(ctx, ref) if err != nil { switch err.(type) { @@ -942,7 +938,7 @@ func (fs *eosfs) Unlock(ctx context.Context, ref *provider.Reference, lock *prov return errors.Wrap(err, "eosfs: cannot check if user has write access on resource") } if !has { - return errtypes.PermissionDenied(fmt.Sprintf("user %s has not write access on resource", user.Username)) + return errtypes.PermissionDenied(fmt.Sprintf("user %s has no write access on resource", user.Username)) } path, err := fs.resolve(ctx, ref)