Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of CS3APIs Locks in the CephFS driver #4280

Merged
merged 11 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/unreleased/ceph-locks.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: implementation of Locks for the CephFS driver

This PR brings CS3APIs Locks for CephFS

https://github.com/cs3org/reva/pull/4280
222 changes: 207 additions & 15 deletions pkg/storage/fs/cephfs/cephfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,25 @@ package cephfs

import (
"context"
b64 "encoding/base64"
"encoding/json"
"fmt"
"hash/fnv"
"io"
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"time"

cephfs2 "github.com/ceph/go-ceph/cephfs"
goceph "github.com/ceph/go-ceph/cephfs"
glpatcern marked this conversation as resolved.
Show resolved Hide resolved
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/pkg/appctx"
"github.com/cs3org/reva/pkg/errtypes"
"github.com/cs3org/reva/pkg/storage"
"github.com/cs3org/reva/pkg/storage/fs/registry"
"github.com/cs3org/reva/pkg/utils"
"github.com/cs3org/reva/pkg/utils/cfg"
"github.com/pkg/errors"
)
Expand All @@ -49,6 +54,7 @@ const (
xattrRef = xattrTrustedNs + "ref"
xattrUserNs = "user."
snap = ".snap"
xattrLock = xattrUserNs + "reva.lockpayload"
)

type cephfs struct {
Expand Down Expand Up @@ -112,7 +118,7 @@ func (fs *cephfs) CreateHome(ctx context.Context) (err error) {
user := fs.makeUser(ctx)

// Stop createhome from running the whole thing because it is called multiple times
if _, err = fs.adminConn.adminMount.Statx(user.home, cephfs2.StatxMode, 0); err == nil {
if _, err = fs.adminConn.adminMount.Statx(user.home, goceph.StatxMode, 0); err == nil {
return
}

Expand Down Expand Up @@ -221,7 +227,7 @@ func (fs *cephfs) GetMD(ctx context.Context, ref *provider.Reference, mdKeys []s

user.op(func(cv *cacheVal) {
var stat Statx
if stat, err = cv.mount.Statx(path, cephfs2.StatxBasicStats, 0); err != nil {
if stat, err = cv.mount.Statx(path, goceph.StatxBasicStats, 0); err != nil {
return
}
ri, err = user.fileAsResourceInfo(cv, path, stat, mdKeys)
Expand All @@ -245,15 +251,15 @@ func (fs *cephfs) ListFolder(ctx context.Context, ref *provider.Reference, mdKey
}

user.op(func(cv *cacheVal) {
var dir *cephfs2.Directory
var dir *goceph.Directory
if dir, err = cv.mount.OpenDir(path); err != nil {
return
}
defer closeDir(dir)

var entry *cephfs2.DirEntryPlus
var entry *goceph.DirEntryPlus
var ri *provider.ResourceInfo
for entry, err = dir.ReadDirPlus(cephfs2.StatxBasicStats, 0); entry != nil && err == nil; entry, err = dir.ReadDirPlus(cephfs2.StatxBasicStats, 0) {
for entry, err = dir.ReadDirPlus(goceph.StatxBasicStats, 0); entry != nil && err == nil; entry, err = dir.ReadDirPlus(goceph.StatxBasicStats, 0) {
if fs.conf.HiddenDirs[entry.Name()] {
continue
}
Expand Down Expand Up @@ -306,7 +312,7 @@ func (fs *cephfs) ListRevisions(ctx context.Context, ref *provider.Reference) (f
err = errtypes.PermissionDenied("cephfs: cannot download under the virtual share folder")
return
}
var dir *cephfs2.Directory
var dir *goceph.Directory
if dir, err = cv.mount.OpenDir(".snap"); err != nil {
return
}
Expand All @@ -325,7 +331,7 @@ func (fs *cephfs) ListRevisions(ctx context.Context, ref *provider.Reference) (f
if e != nil {
continue
}
stat, e = cv.mount.Statx(revPath, cephfs2.StatxMtime|cephfs2.StatxSize, 0)
stat, e = cv.mount.Statx(revPath, goceph.StatxMtime|goceph.StatxSize, 0)
if e != nil {
continue
}
Expand Down Expand Up @@ -372,7 +378,7 @@ func (fs *cephfs) RestoreRevision(ctx context.Context, ref *provider.Reference,
return
}

var src, dst *cephfs2.File
var src, dst *goceph.File
if src, err = cv.mount.Open(revPath, os.O_RDONLY, 0); err != nil {
return
}
Expand Down Expand Up @@ -575,7 +581,7 @@ func (fs *cephfs) TouchFile(ctx context.Context, ref *provider.Reference) error
}

user.op(func(cv *cacheVal) {
var file *cephfs2.File
var file *goceph.File
defer closeFile(file)
if file, err = cv.mount.Open(path, os.O_CREATE|os.O_WRONLY, fs.conf.FilePerms); err != nil {
return
Expand Down Expand Up @@ -615,18 +621,204 @@ func (fs *cephfs) UpdateStorageSpace(ctx context.Context, req *provider.UpdateSt
return nil, errtypes.NotSupported("unimplemented")
}

var fnvHash = fnv.New32a()

func getHash(s string) uint64 {
fnvHash.Write([]byte(s))
defer fnvHash.Reset()
return uint64(fnvHash.Sum32())
}

func encodeLock(l *provider.Lock) string {
data, _ := json.Marshal(l)
return b64.StdEncoding.EncodeToString(data)
}

func decodeLock(content string) (*provider.Lock, error) {
d, err := b64.StdEncoding.DecodeString(content)
if err != nil {
return nil, err
}

l := &provider.Lock{}
if err = json.Unmarshal(d, l); err != nil {
return nil, err
}

return l, nil
}

func (fs *cephfs) SetLock(ctx context.Context, ref *provider.Reference, lock *provider.Lock) error {
return errtypes.NotSupported("unimplemented")
user := fs.makeUser(ctx)
path, err := user.resolveRef(ref)
if err != nil {
return getRevaError(err)
}

op := goceph.LockEX
if lock.Type == provider.LockType_LOCK_TYPE_SHARED {
op = goceph.LockSH
}

user.op(func(cv *cacheVal) {
var file *goceph.File
defer closeFile(file)
if file, err = cv.mount.Open(path, os.O_RDWR, fs.conf.FilePerms); err != nil {
return
}

err = file.Flock(op|goceph.LockNB, getHash(lock.AppName))
})

if err == nil {
// ok, we got the flock, now also store the related lock metadata
md := &provider.ArbitraryMetadata{
Metadata: map[string]string{
xattrLock: encodeLock(lock),
},
}
return fs.SetArbitraryMetadata(ctx, ref, md)
}

return getRevaError(err)
}

func (fs *cephfs) GetLock(ctx context.Context, ref *provider.Reference) (*provider.Lock, error) {
return nil, errtypes.NotSupported("unimplemented")
user := fs.makeUser(ctx)
path, err := user.resolveRef(ref)
if err != nil {
return nil, getRevaError(err)
}

var l *provider.Lock
user.op(func(cv *cacheVal) {
buf, errXattr := cv.mount.GetXattr(path, xattrLock)
if errXattr == nil {
if l, err = decodeLock(string(buf)); err != nil {
err = errors.Wrap(err, "malformed lock payload")
return
}
}

var file *goceph.File
defer closeFile(file)
if file, err = cv.mount.Open(path, os.O_RDWR, fs.conf.FilePerms); err != nil {
// try and open with read-only permissions: if this succeeds, we just return
// the metadata as is, otherwise we return the error on Open()
if file, err = cv.mount.Open(path, os.O_RDONLY, fs.conf.FilePerms); err != nil {
l = nil
}
return
}

if err = file.Flock(goceph.LockEX|goceph.LockNB, 0); err == nil {
// success means the file was not locked, drop related metadata if present
if l != nil {
fs.UnsetArbitraryMetadata(ctx, ref, []string{xattrLock})
l = nil
}
file.Flock(goceph.LockUN|goceph.LockNB, 0)
err = errtypes.NotFound("file was not locked")
glpatcern marked this conversation as resolved.
Show resolved Hide resolved
return
}

if errXattr != nil {
// error here means we have a "foreign" flock with no CS3 metadata
err = nil
l = new(provider.Lock)
l.AppName = "External"
return
}

if time.Unix(int64(l.Expiration.Seconds), 0).After(time.Now()) {
// the lock expired, drop
fs.UnsetArbitraryMetadata(ctx, ref, []string{xattrLock})
file.Flock(goceph.LockUN|goceph.LockNB, getHash(l.AppName))
err = errtypes.NotFound("file was not locked")
l = nil
}
return
})

return l, getRevaError(err)
}

func (fs *cephfs) RefreshLock(ctx context.Context, ref *provider.Reference, lock *provider.Lock, existingLockID string) error {
return errtypes.NotSupported("unimplemented")
// TODO(lopresti) part of this logic is duplicated from eosfs.go, should be factored out
func sameHolder(l1, l2 *provider.Lock) bool {
same := true
if l1.User != nil || l2.User != nil {
same = utils.UserEqual(l1.User, l2.User)
}
if l1.AppName != "" || l2.AppName != "" {
same = l1.AppName == l2.AppName
}
return same
}

func (fs *cephfs) RefreshLock(ctx context.Context, ref *provider.Reference, newLock *provider.Lock, existingLockID string) error {
oldLock, err := fs.GetLock(ctx, ref)
if err != nil {
switch err.(type) {
case errtypes.NotFound:
// the lock does not exist
return errtypes.BadRequest("file was not locked")
default:
return err
}
}

// check if the holder is the same of the new lock
if !sameHolder(oldLock, newLock) {
return errtypes.BadRequest("caller does not hold the lock")
}

if existingLockID != "" && oldLock.LockId != existingLockID {
return errtypes.BadRequest("lock id does not match")
}

return fs.SetLock(ctx, ref, newLock)
}

func (fs *cephfs) Unlock(ctx context.Context, ref *provider.Reference, lock *provider.Lock) error {
return errtypes.NotSupported("unimplemented")
user := fs.makeUser(ctx)
path, err := user.resolveRef(ref)
if err != nil {
return getRevaError(err)
}

oldLock, err := fs.GetLock(ctx, ref)
if err != nil {
switch err.(type) {
case errtypes.NotFound:
// the lock does not exist
return errtypes.BadRequest("file not found or not locked")
default:
return err
}
}

// check if the lock id of the lock corresponds to the stored lock
if oldLock.LockId != lock.LockId {
return errtypes.BadRequest("lock id does not match")
}

if !sameHolder(oldLock, lock) {
return errtypes.BadRequest("caller does not hold the lock")
}

user.op(func(cv *cacheVal) {
var file *goceph.File
defer closeFile(file)
if file, err = cv.mount.Open(path, os.O_RDWR, fs.conf.FilePerms); err != nil {
return
}

err = file.Flock(goceph.LockUN|goceph.LockNB, getHash(lock.AppName))
})

if err == nil {
return fs.UnsetArbitraryMetadata(ctx, ref, []string{xattrLock})
}

return getRevaError(err)
}
10 changes: 5 additions & 5 deletions pkg/storage/fs/cephfs/chunking.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
"strings"
"time"

cephfs2 "github.com/ceph/go-ceph/cephfs"
goceph "github.com/ceph/go-ceph/cephfs"
"github.com/google/uuid"
)

Expand Down Expand Up @@ -118,7 +118,7 @@ func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (finish bool, chu

chunkTempFilename := c.getChunkTempFileName()
c.user.op(func(cv *cacheVal) {
var tmpFile *cephfs2.File
var tmpFile *goceph.File
target := filepath.Join(c.chunkFolder, chunkTempFilename)
tmpFile, err = cv.mount.Open(target, os.O_CREATE|os.O_WRONLY, c.user.fs.conf.FilePerms)
defer closeFile(tmpFile)
Expand Down Expand Up @@ -152,9 +152,9 @@ func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (finish bool, chu
// assembly the chunks when the client asks for it.
numEntries := 0
c.user.op(func(cv *cacheVal) {
var dir *cephfs2.Directory
var entry *cephfs2.DirEntry
var chunkFile, assembledFile *cephfs2.File
var dir *goceph.Directory
var entry *goceph.DirEntry
var chunkFile, assembledFile *goceph.File

dir, err = cv.mount.OpenDir(chunksFolderName)
defer closeDir(dir)
Expand Down
Loading