Skip to content

Commit

Permalink
Implemented locking for CephFS
Browse files Browse the repository at this point in the history
With minor alignments on the EOS implementation
  • Loading branch information
glpatcern committed Oct 24, 2023
1 parent be1b695 commit cb09a64
Show file tree
Hide file tree
Showing 4 changed files with 210 additions and 13 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/ceph-locks.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: implementation of Locks for the CephFS driver

This PR brings CS3APIs Locks for CephFS

https://github.com/cs3org/reva/pull/4280
206 changes: 201 additions & 5 deletions pkg/storage/fs/cephfs/cephfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,29 @@
package cephfs

import (
"bytes"
"context"
b64 "encoding/base64"
"encoding/binary"
"encoding/json"
"fmt"
"hash"
"hash/fnv"
"io"
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"time"

goceph "github.com/ceph/go-ceph/cephfs"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/pkg/appctx"
"github.com/cs3org/reva/pkg/errtypes"
"github.com/cs3org/reva/pkg/storage"
"github.com/cs3org/reva/pkg/storage/fs/registry"
"github.com/cs3org/reva/pkg/utils"
"github.com/cs3org/reva/pkg/utils/cfg"
"github.com/pkg/errors"
)
Expand All @@ -49,6 +57,7 @@ const (
xattrRef = xattrTrustedNs + "ref"
xattrUserNs = "user."
snap = ".snap"
xattrLock = xattrUserNs + "reva.lockpayload"
)

type cephfs struct {
Expand Down Expand Up @@ -615,18 +624,205 @@ func (fs *cephfs) UpdateStorageSpace(ctx context.Context, req *provider.UpdateSt
return nil, errtypes.NotSupported("unimplemented")
}

var fnvHash hash.Hash32 = fnv.New32a()

func getHash(s string) uint64 {
fnvHash.Write([]byte(s))
defer fnvHash.Reset()
buf := bytes.NewReader(fnvHash.Sum(nil))
var res uint32
binary.Read(buf, binary.BigEndian, &res)
return uint64(res)
// h := fnvHash.Sum(nil)
// return uint64(h[0]) + uint64(h[1])*uint64(2<<8) + uint64(h[2])*uint64(2<<16) + uint64(h[3])*uint64(2<<24)
}

func encodeLock(l *provider.Lock) string {
data, _ := json.Marshal(l)
return b64.StdEncoding.EncodeToString(data)
}

func decodeLock(content string) (*provider.Lock, error) {
d, err := b64.StdEncoding.DecodeString(content)
if err != nil {
return nil, err
}

l := new(provider.Lock)
err = json.Unmarshal(d, l)
if err != nil {
return nil, err
}

return l, nil
}

// TODO(lopresti) part of this logic is duplicated from eosfs.go, should be factored out
func (fs *cephfs) SetLock(ctx context.Context, ref *provider.Reference, lock *provider.Lock) error {
return errtypes.NotSupported("unimplemented")
user := fs.makeUser(ctx)
path, err := user.resolveRef(ref)
if err != nil {
return getRevaError(err)
}

op := goceph.LockEX
if lock.Type == provider.LockType_LOCK_TYPE_SHARED {
op = goceph.LockSH
}

user.op(func(cv *cacheVal) {
var file *goceph.File
defer closeFile(file)
if file, err = cv.mount.Open(path, os.O_RDWR, fs.conf.FilePerms); err != nil {
return
}

if err = file.Flock(op|goceph.LockNB, getHash(lock.AppName)); err != nil {
// already locked?
return
}
})

if err == nil {
md := &provider.ArbitraryMetadata{
Metadata: map[string]string{
xattrLock: encodeLock(lock),
},
}
return fs.SetArbitraryMetadata(ctx, ref, md)
}

return getRevaError(err)
}

func (fs *cephfs) GetLock(ctx context.Context, ref *provider.Reference) (*provider.Lock, error) {
return nil, errtypes.NotSupported("unimplemented")
user := fs.makeUser(ctx)
path, err := user.resolveRef(ref)
if err != nil {
return nil, getRevaError(err)
}

var l *provider.Lock
user.op(func(cv *cacheVal) {
var file *goceph.File
defer closeFile(file)
// TODO(lopresti) O_RDONLY should be enough, but we want to try and grab a lock to test if a lock existed
if file, err = cv.mount.Open(path, os.O_RDWR, fs.conf.FilePerms); err != nil {
return
}

if err = file.Flock(goceph.LockEX|goceph.LockNB, 0); err == nil {
// success means file was not locked
file.Flock(goceph.LockUN|goceph.LockNB, 0)
err = errtypes.NotFound("file was not locked")
return
}

buf, err := cv.mount.GetXattr(path, xattrLock)
if err != nil {
// error here means we have a "foreign" flock with no CS3 metadata
err = nil
l = new(provider.Lock)
l.AppName = "External"
return
}

if l, err = decodeLock(string(buf)); err != nil {
l = nil
err = errors.Wrap(err, "cephfs: malformed lock payload")
return
}

if time.Unix(int64(l.Expiration.Seconds), 0).After(time.Now()) {
// the lock expired, drop
fs.UnsetArbitraryMetadata(ctx, ref, []string{xattrLock})
file.Flock(goceph.LockUN|goceph.LockNB, getHash(l.AppName))
err = errtypes.NotFound("file was not locked")
l = nil
}
return
})

return l, getRevaError(err)
}

func (fs *cephfs) RefreshLock(ctx context.Context, ref *provider.Reference, lock *provider.Lock, existingLockID string) error {
return errtypes.NotSupported("unimplemented")
func sameHolder(l1, l2 *provider.Lock) bool {
same := true
if l1.User != nil || l2.User != nil {
same = utils.UserEqual(l1.User, l2.User)
}
if l1.AppName != "" || l2.AppName != "" {
same = l1.AppName == l2.AppName
}
return same
}

func (fs *cephfs) RefreshLock(ctx context.Context, ref *provider.Reference, newLock *provider.Lock, existingLockID string) error {
oldLock, err := fs.GetLock(ctx, ref)
if err != nil {
switch err.(type) {
case errtypes.NotFound:
// the lock does not exist
return errtypes.BadRequest("file was not locked")
default:
return err
}
}

// check if the holder is the same of the new lock
if !sameHolder(oldLock, newLock) {
return errtypes.BadRequest("caller does not hold the lock")
}

if existingLockID != "" && oldLock.LockId != existingLockID {
return errtypes.BadRequest("mismatching existing lock id")
}

return fs.SetLock(ctx, ref, newLock)
}

func (fs *cephfs) Unlock(ctx context.Context, ref *provider.Reference, lock *provider.Lock) error {
return errtypes.NotSupported("unimplemented")
user := fs.makeUser(ctx)
path, err := user.resolveRef(ref)
if err != nil {
return getRevaError(err)
}

oldLock, err := fs.GetLock(ctx, ref)
if err != nil {
switch err.(type) {
case errtypes.NotFound:
// the lock does not exist
return errtypes.BadRequest("file not found or not locked")
default:
return err
}
}

// check if the lock id of the lock corresponds to the stored lock
if oldLock.LockId != lock.LockId {
return errtypes.BadRequest("lock id does not match")
}

if !sameHolder(oldLock, lock) {
return errtypes.BadRequest("caller does not hold the lock")
}

user.op(func(cv *cacheVal) {
var file *goceph.File
defer closeFile(file)
if file, err = cv.mount.Open(path, os.O_RDWR, fs.conf.FilePerms); err != nil {
return
}

if err = file.Flock(goceph.LockUN|goceph.LockNB, getHash(lock.AppName)); err != nil {
return
}
})

if err == nil {
return fs.UnsetArbitraryMetadata(ctx, ref, []string{xattrLock})
}

return getRevaError(err)
}
2 changes: 1 addition & 1 deletion pkg/storage/fs/cephfs/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func getRevaError(err error) error {
}
switch err.Error() {
case errNotFound:
return errtypes.NotFound("cephfs: dir entry not found")
return errtypes.NotFound("cephfs: entry not found")
case errPermissionDenied:
return errtypes.PermissionDenied("cephfs: permission denied")
case errFileExists:
Expand Down
10 changes: 3 additions & 7 deletions pkg/storage/utils/eosfs/eosfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ const (
const EosLockKey = "app.lock"

// LockPayloadKey is the key in the xattrs used to store the lock payload.
const LockPayloadKey = "reva.lock.payload"
const LockPayloadKey = "reva.lockpayload"

var hiddenReg = regexp.MustCompile(`\.sys\..#.`)

Expand Down Expand Up @@ -887,7 +887,7 @@ func (fs *eosfs) RefreshLock(ctx context.Context, ref *provider.Reference, newLo
return errors.Wrap(err, "eosfs: cannot check if user has write access on resource")
}
if !has {
return errtypes.PermissionDenied(fmt.Sprintf("user %s has not write access on resource", user.Username))
return errtypes.PermissionDenied(fmt.Sprintf("user %s has no write access on resource", user.Username))
}

return fs.setLock(ctx, newLock, path)
Expand All @@ -906,10 +906,6 @@ func sameHolder(l1, l2 *provider.Lock) bool {

// Unlock removes an existing lock from the given reference.
func (fs *eosfs) Unlock(ctx context.Context, ref *provider.Reference, lock *provider.Lock) error {
if lock.Type == provider.LockType_LOCK_TYPE_SHARED {
return errtypes.NotSupported("shared lock not yet implemented")
}

oldLock, err := fs.GetLock(ctx, ref)
if err != nil {
switch err.(type) {
Expand Down Expand Up @@ -942,7 +938,7 @@ func (fs *eosfs) Unlock(ctx context.Context, ref *provider.Reference, lock *prov
return errors.Wrap(err, "eosfs: cannot check if user has write access on resource")
}
if !has {
return errtypes.PermissionDenied(fmt.Sprintf("user %s has not write access on resource", user.Username))
return errtypes.PermissionDenied(fmt.Sprintf("user %s has no write access on resource", user.Username))
}

path, err := fs.resolve(ctx, ref)
Expand Down

0 comments on commit cb09a64

Please sign in to comment.