Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of CS3APIs Locks in the CephFS driver #4280

Merged
merged 11 commits into from
Oct 26, 2023
Merged
5 changes: 5 additions & 0 deletions changelog/unreleased/ceph-locks.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: implementation of Locks for the CephFS driver

This PR brings CS3APIs Locks for CephFS

https://github.com/cs3org/reva/pull/4280
229 changes: 214 additions & 15 deletions pkg/storage/fs/cephfs/cephfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,29 @@
package cephfs

import (
"bytes"
"context"
b64 "encoding/base64"
"encoding/binary"
"encoding/json"
"fmt"
"hash"
"hash/fnv"
"io"
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"time"

cephfs2 "github.com/ceph/go-ceph/cephfs"
goceph "github.com/ceph/go-ceph/cephfs"
glpatcern marked this conversation as resolved.
Show resolved Hide resolved
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/pkg/appctx"
"github.com/cs3org/reva/pkg/errtypes"
"github.com/cs3org/reva/pkg/storage"
"github.com/cs3org/reva/pkg/storage/fs/registry"
"github.com/cs3org/reva/pkg/utils"
"github.com/cs3org/reva/pkg/utils/cfg"
"github.com/pkg/errors"
)
Expand All @@ -49,6 +57,7 @@ const (
xattrRef = xattrTrustedNs + "ref"
xattrUserNs = "user."
snap = ".snap"
xattrLock = xattrUserNs + "reva.lockpayload"
)

type cephfs struct {
Expand Down Expand Up @@ -112,7 +121,7 @@ func (fs *cephfs) CreateHome(ctx context.Context) (err error) {
user := fs.makeUser(ctx)

// Stop createhome from running the whole thing because it is called multiple times
if _, err = fs.adminConn.adminMount.Statx(user.home, cephfs2.StatxMode, 0); err == nil {
if _, err = fs.adminConn.adminMount.Statx(user.home, goceph.StatxMode, 0); err == nil {
return
}

Expand Down Expand Up @@ -221,7 +230,7 @@ func (fs *cephfs) GetMD(ctx context.Context, ref *provider.Reference, mdKeys []s

user.op(func(cv *cacheVal) {
var stat Statx
if stat, err = cv.mount.Statx(path, cephfs2.StatxBasicStats, 0); err != nil {
if stat, err = cv.mount.Statx(path, goceph.StatxBasicStats, 0); err != nil {
return
}
ri, err = user.fileAsResourceInfo(cv, path, stat, mdKeys)
Expand All @@ -245,15 +254,15 @@ func (fs *cephfs) ListFolder(ctx context.Context, ref *provider.Reference, mdKey
}

user.op(func(cv *cacheVal) {
var dir *cephfs2.Directory
var dir *goceph.Directory
if dir, err = cv.mount.OpenDir(path); err != nil {
return
}
defer closeDir(dir)

var entry *cephfs2.DirEntryPlus
var entry *goceph.DirEntryPlus
var ri *provider.ResourceInfo
for entry, err = dir.ReadDirPlus(cephfs2.StatxBasicStats, 0); entry != nil && err == nil; entry, err = dir.ReadDirPlus(cephfs2.StatxBasicStats, 0) {
for entry, err = dir.ReadDirPlus(goceph.StatxBasicStats, 0); entry != nil && err == nil; entry, err = dir.ReadDirPlus(goceph.StatxBasicStats, 0) {
if fs.conf.HiddenDirs[entry.Name()] {
continue
}
Expand Down Expand Up @@ -306,7 +315,7 @@ func (fs *cephfs) ListRevisions(ctx context.Context, ref *provider.Reference) (f
err = errtypes.PermissionDenied("cephfs: cannot download under the virtual share folder")
return
}
var dir *cephfs2.Directory
var dir *goceph.Directory
if dir, err = cv.mount.OpenDir(".snap"); err != nil {
return
}
Expand All @@ -325,7 +334,7 @@ func (fs *cephfs) ListRevisions(ctx context.Context, ref *provider.Reference) (f
if e != nil {
continue
}
stat, e = cv.mount.Statx(revPath, cephfs2.StatxMtime|cephfs2.StatxSize, 0)
stat, e = cv.mount.Statx(revPath, goceph.StatxMtime|goceph.StatxSize, 0)
if e != nil {
continue
}
Expand Down Expand Up @@ -372,7 +381,7 @@ func (fs *cephfs) RestoreRevision(ctx context.Context, ref *provider.Reference,
return
}

var src, dst *cephfs2.File
var src, dst *goceph.File
if src, err = cv.mount.Open(revPath, os.O_RDONLY, 0); err != nil {
return
}
Expand Down Expand Up @@ -575,7 +584,7 @@ func (fs *cephfs) TouchFile(ctx context.Context, ref *provider.Reference) error
}

user.op(func(cv *cacheVal) {
var file *cephfs2.File
var file *goceph.File
defer closeFile(file)
if file, err = cv.mount.Open(path, os.O_CREATE|os.O_WRONLY, fs.conf.FilePerms); err != nil {
return
Expand Down Expand Up @@ -615,18 +624,208 @@ func (fs *cephfs) UpdateStorageSpace(ctx context.Context, req *provider.UpdateSt
return nil, errtypes.NotSupported("unimplemented")
}

var fnvHash hash.Hash32 = fnv.New32a()
glpatcern marked this conversation as resolved.
Show resolved Hide resolved

func getHash(s string) uint64 {
fnvHash.Write([]byte(s))
defer fnvHash.Reset()
buf := bytes.NewReader(fnvHash.Sum(nil))
var res uint32
binary.Read(buf, binary.BigEndian, &res)
return uint64(res)
glpatcern marked this conversation as resolved.
Show resolved Hide resolved
}

func encodeLock(l *provider.Lock) string {
data, _ := json.Marshal(l)
return b64.StdEncoding.EncodeToString(data)
}

func decodeLock(content string) (*provider.Lock, error) {
d, err := b64.StdEncoding.DecodeString(content)
if err != nil {
return nil, err
}

l := new(provider.Lock)
err = json.Unmarshal(d, l)
if err != nil {
return nil, err
}

return l, nil
glpatcern marked this conversation as resolved.
Show resolved Hide resolved
}

// TODO(lopresti) part of this logic is duplicated from eosfs.go, should be factored out
glpatcern marked this conversation as resolved.
Show resolved Hide resolved
func (fs *cephfs) SetLock(ctx context.Context, ref *provider.Reference, lock *provider.Lock) error {
return errtypes.NotSupported("unimplemented")
user := fs.makeUser(ctx)
path, err := user.resolveRef(ref)
if err != nil {
return getRevaError(err)
}

op := goceph.LockEX
if lock.Type == provider.LockType_LOCK_TYPE_SHARED {
op = goceph.LockSH
}

user.op(func(cv *cacheVal) {
var file *goceph.File
defer closeFile(file)
if file, err = cv.mount.Open(path, os.O_RDWR, fs.conf.FilePerms); err != nil {
return
}

if err = file.Flock(op|goceph.LockNB, getHash(lock.AppName)); err != nil {
// already locked?
return
}
glpatcern marked this conversation as resolved.
Show resolved Hide resolved
})

if err == nil {
md := &provider.ArbitraryMetadata{
Metadata: map[string]string{
xattrLock: encodeLock(lock),
},
}
return fs.SetArbitraryMetadata(ctx, ref, md)
}

return getRevaError(err)
}

func (fs *cephfs) GetLock(ctx context.Context, ref *provider.Reference) (*provider.Lock, error) {
return nil, errtypes.NotSupported("unimplemented")
user := fs.makeUser(ctx)
path, err := user.resolveRef(ref)
if err != nil {
return nil, getRevaError(err)
}

var l *provider.Lock
user.op(func(cv *cacheVal) {
buf, errXattr := cv.mount.GetXattr(path, xattrLock)
if errXattr == nil {
if l, err = decodeLock(string(buf)); err != nil {
err = errors.Wrap(err, "malformed lock payload")
return
}
}

var file *goceph.File
defer closeFile(file)
if file, err = cv.mount.Open(path, os.O_RDWR, fs.conf.FilePerms); err != nil {
// try and open with read-only permissions: if this succeeds, we just return
// the metadata as is, otherwise we return the error on Open()
if file, err = cv.mount.Open(path, os.O_RDONLY, fs.conf.FilePerms); err != nil {
l = nil
}
return
}

if err = file.Flock(goceph.LockEX|goceph.LockNB, 0); err == nil {
// success means file was not locked
file.Flock(goceph.LockUN|goceph.LockNB, 0)
err = errtypes.NotFound("file was not locked")
glpatcern marked this conversation as resolved.
Show resolved Hide resolved
return
}

if errXattr != nil {
// error here means we have a "foreign" flock with no CS3 metadata
err = nil
l = new(provider.Lock)
l.AppName = "External"
return
}

if time.Unix(int64(l.Expiration.Seconds), 0).After(time.Now()) {
// the lock expired, drop
fs.UnsetArbitraryMetadata(ctx, ref, []string{xattrLock})
file.Flock(goceph.LockUN|goceph.LockNB, getHash(l.AppName))
err = errtypes.NotFound("file was not locked")
l = nil
}
return
})

return l, getRevaError(err)
}

func (fs *cephfs) RefreshLock(ctx context.Context, ref *provider.Reference, lock *provider.Lock, existingLockID string) error {
return errtypes.NotSupported("unimplemented")
func sameHolder(l1, l2 *provider.Lock) bool {
same := true
if l1.User != nil || l2.User != nil {
same = utils.UserEqual(l1.User, l2.User)
}
if l1.AppName != "" || l2.AppName != "" {
same = l1.AppName == l2.AppName
}
return same
}

func (fs *cephfs) RefreshLock(ctx context.Context, ref *provider.Reference, newLock *provider.Lock, existingLockID string) error {
oldLock, err := fs.GetLock(ctx, ref)
if err != nil {
switch err.(type) {
case errtypes.NotFound:
// the lock does not exist
return errtypes.BadRequest("file was not locked")
default:
return err
}
}

// check if the holder is the same of the new lock
if !sameHolder(oldLock, newLock) {
return errtypes.BadRequest("caller does not hold the lock")
}

if existingLockID != "" && oldLock.LockId != existingLockID {
return errtypes.BadRequest("lock id does not match")
}

return fs.SetLock(ctx, ref, newLock)
}

func (fs *cephfs) Unlock(ctx context.Context, ref *provider.Reference, lock *provider.Lock) error {
return errtypes.NotSupported("unimplemented")
user := fs.makeUser(ctx)
path, err := user.resolveRef(ref)
if err != nil {
return getRevaError(err)
}

oldLock, err := fs.GetLock(ctx, ref)
if err != nil {
switch err.(type) {
case errtypes.NotFound:
// the lock does not exist
return errtypes.BadRequest("file not found or not locked")
default:
return err
}
}

// check if the lock id of the lock corresponds to the stored lock
if oldLock.LockId != lock.LockId {
return errtypes.BadRequest("lock id does not match")
}

if !sameHolder(oldLock, lock) {
return errtypes.BadRequest("caller does not hold the lock")
}

user.op(func(cv *cacheVal) {
var file *goceph.File
defer closeFile(file)
if file, err = cv.mount.Open(path, os.O_RDWR, fs.conf.FilePerms); err != nil {
return
}

if err = file.Flock(goceph.LockUN|goceph.LockNB, getHash(lock.AppName)); err != nil {
return
}
glpatcern marked this conversation as resolved.
Show resolved Hide resolved
})

if err == nil {
return fs.UnsetArbitraryMetadata(ctx, ref, []string{xattrLock})
}

return getRevaError(err)
}
10 changes: 5 additions & 5 deletions pkg/storage/fs/cephfs/chunking.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
"strings"
"time"

cephfs2 "github.com/ceph/go-ceph/cephfs"
goceph "github.com/ceph/go-ceph/cephfs"
"github.com/google/uuid"
)

Expand Down Expand Up @@ -118,7 +118,7 @@ func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (finish bool, chu

chunkTempFilename := c.getChunkTempFileName()
c.user.op(func(cv *cacheVal) {
var tmpFile *cephfs2.File
var tmpFile *goceph.File
target := filepath.Join(c.chunkFolder, chunkTempFilename)
tmpFile, err = cv.mount.Open(target, os.O_CREATE|os.O_WRONLY, c.user.fs.conf.FilePerms)
defer closeFile(tmpFile)
Expand Down Expand Up @@ -152,9 +152,9 @@ func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (finish bool, chu
// assembly the chunks when the client asks for it.
numEntries := 0
c.user.op(func(cv *cacheVal) {
var dir *cephfs2.Directory
var entry *cephfs2.DirEntry
var chunkFile, assembledFile *cephfs2.File
var dir *goceph.Directory
var entry *goceph.DirEntry
var chunkFile, assembledFile *goceph.File

dir, err = cv.mount.OpenDir(chunksFolderName)
defer closeDir(dir)
Expand Down
Loading