diff --git a/CHANGELOG.md b/CHANGELOG.md index 21c81d5ba1d..9488996977d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ reva users. The changes are ordered by importance. Summary ------- + * Enh #1209: Reva CephFS module v0.2.1 * Fix #1619: Fixes for enabling file sharing in EOS * Fix #1576: Fix etag changing only once a second * Fix #1634: Mentix site authorization status changes @@ -43,6 +44,9 @@ Summary Details ------- + * Enh #1209: Reva CephFS module v0.2.1 + + Introduce a new module for reva based on cephfs * Bugfix #1619: Fixes for enabling file sharing in EOS diff --git a/Makefile b/Makefile index e2656c0bede..17f9350d47d 100644 --- a/Makefile +++ b/Makefile @@ -7,7 +7,7 @@ GIT_BRANCH=`git rev-parse --symbolic-full-name --abbrev-ref HEAD` GIT_DIRTY=`git diff-index --quiet HEAD -- || echo "dirty-"` VERSION=`git describe --always` GO_VERSION=`go version | awk '{print $$3}'` -BUILD_FLAGS="-w -extldflags "-static" -X main.gitCommit=${GIT_COMMIT} -X main.version=${VERSION} -X main.goVersion=${GO_VERSION} -X main.buildDate=${BUILD_DATE}" +BUILD_FLAGS="-w -extldflags "-dynamic" -X main.gitCommit=${GIT_COMMIT} -X main.version=${VERSION} -X main.goVersion=${GO_VERSION} -X main.buildDate=${BUILD_DATE}" LITMUS_URL_OLD="http://localhost:20080/remote.php/webdav" LITMUS_URL_NEW="http://localhost:20080/remote.php/dav/files/4c510ada-c86b-4815-8820-42cdf82c3d51" LITMUS_USERNAME="einstein" diff --git a/docs/content/en/docs/config/grpc/services/appprovider/_index.md b/docs/content/en/docs/config/grpc/services/appprovider/_index.md index c243fe79e87..f548d91748a 100644 --- a/docs/content/en/docs/config/grpc/services/appprovider/_index.md +++ b/docs/content/en/docs/config/grpc/services/appprovider/_index.md @@ -9,7 +9,7 @@ description: > # _struct: config_ {{% dir name="iopsecret" type="string" default="" %}} -The iopsecret used to connect to the wopiserver. [[Ref]](https://github.com/cs3org/reva/tree/master/internal/grpc/services/appprovider/appprovider.go#L59) +The iopsecret used to connect to the wopiserver. [[Ref]](https://github.com/cs3org/reva/tree/master/internal/grpc/services/appprovider/appprovider.go#L60) {{< highlight toml >}} [grpc.services.appprovider] iopsecret = "" @@ -17,7 +17,7 @@ iopsecret = "" {{% /dir %}} {{% dir name="wopiurl" type="string" default="" %}} -The wopiserver's URL. [[Ref]](https://github.com/cs3org/reva/tree/master/internal/grpc/services/appprovider/appprovider.go#L60) +The wopiserver's URL. [[Ref]](https://github.com/cs3org/reva/tree/master/internal/grpc/services/appprovider/appprovider.go#L61) {{< highlight toml >}} [grpc.services.appprovider] wopiurl = "" @@ -25,7 +25,7 @@ wopiurl = "" {{% /dir %}} {{% dir name="wopibridgeurl" type="string" default="" %}} -The wopibridge's URL. [[Ref]](https://github.com/cs3org/reva/tree/master/internal/grpc/services/appprovider/appprovider.go#L61) +The wopibridge's URL. [[Ref]](https://github.com/cs3org/reva/tree/master/internal/grpc/services/appprovider/appprovider.go#L62) {{< highlight toml >}} [grpc.services.appprovider] wopibridgeurl = "" diff --git a/docs/content/en/docs/config/packages/auth/manager/oidc/_index.md b/docs/content/en/docs/config/packages/auth/manager/oidc/_index.md index 5e457997938..6abc8039f10 100644 --- a/docs/content/en/docs/config/packages/auth/manager/oidc/_index.md +++ b/docs/content/en/docs/config/packages/auth/manager/oidc/_index.md @@ -9,7 +9,7 @@ description: > # _struct: config_ {{% dir name="insecure" type="bool" default=false %}} -Whether to skip certificate checks when sending requests. [[Ref]](https://github.com/cs3org/reva/tree/master/pkg/auth/manager/oidc/oidc.go#L53) +Whether to skip certificate checks when sending requests. [[Ref]](https://github.com/cs3org/reva/tree/master/pkg/auth/manager/oidc/oidc.go#L55) {{< highlight toml >}} [auth.manager.oidc] insecure = false @@ -17,7 +17,7 @@ insecure = false {{% /dir %}} {{% dir name="issuer" type="string" default="" %}} -The issuer of the OIDC token. [[Ref]](https://github.com/cs3org/reva/tree/master/pkg/auth/manager/oidc/oidc.go#L54) +The issuer of the OIDC token. [[Ref]](https://github.com/cs3org/reva/tree/master/pkg/auth/manager/oidc/oidc.go#L56) {{< highlight toml >}} [auth.manager.oidc] issuer = "" @@ -25,7 +25,7 @@ issuer = "" {{% /dir %}} {{% dir name="id_claim" type="string" default="sub" %}} -The claim containing the ID of the user. [[Ref]](https://github.com/cs3org/reva/tree/master/pkg/auth/manager/oidc/oidc.go#L55) +The claim containing the ID of the user. [[Ref]](https://github.com/cs3org/reva/tree/master/pkg/auth/manager/oidc/oidc.go#L57) {{< highlight toml >}} [auth.manager.oidc] id_claim = "sub" @@ -33,7 +33,7 @@ id_claim = "sub" {{% /dir %}} {{% dir name="uid_claim" type="string" default="" %}} -The claim containing the UID of the user. [[Ref]](https://github.com/cs3org/reva/tree/master/pkg/auth/manager/oidc/oidc.go#L56) +The claim containing the UID of the user. [[Ref]](https://github.com/cs3org/reva/tree/master/pkg/auth/manager/oidc/oidc.go#L58) {{< highlight toml >}} [auth.manager.oidc] uid_claim = "" @@ -41,7 +41,7 @@ uid_claim = "" {{% /dir %}} {{% dir name="gid_claim" type="string" default="" %}} -The claim containing the GID of the user. [[Ref]](https://github.com/cs3org/reva/tree/master/pkg/auth/manager/oidc/oidc.go#L57) +The claim containing the GID of the user. [[Ref]](https://github.com/cs3org/reva/tree/master/pkg/auth/manager/oidc/oidc.go#L59) {{< highlight toml >}} [auth.manager.oidc] gid_claim = "" @@ -49,7 +49,7 @@ gid_claim = "" {{% /dir %}} {{% dir name="gatewaysvc" type="string" default="" %}} -The endpoint at which the GRPC gateway is exposed. [[Ref]](https://github.com/cs3org/reva/tree/master/pkg/auth/manager/oidc/oidc.go#L58) +The endpoint at which the GRPC gateway is exposed. [[Ref]](https://github.com/cs3org/reva/tree/master/pkg/auth/manager/oidc/oidc.go#L60) {{< highlight toml >}} [auth.manager.oidc] gatewaysvc = "" diff --git a/go.mod b/go.mod index 46e50b33f12..43dd44fcfa7 100644 --- a/go.mod +++ b/go.mod @@ -11,10 +11,12 @@ require ( github.com/aws/aws-sdk-go v1.38.40 github.com/bluele/gcache v0.0.2 github.com/c-bata/go-prompt v0.2.5 + github.com/ceph/go-ceph v0.9.0 github.com/cheggaaa/pb v1.0.29 github.com/coreos/go-oidc v2.2.1+incompatible github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e github.com/cs3org/go-cs3apis v0.0.0-20210507060801-f176760d55f4 + github.com/dgraph-io/ristretto v0.0.3 github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/eventials/go-tus v0.0.0-20200718001131-45c7ec8f5d59 github.com/go-ldap/ldap/v3 v3.3.0 @@ -31,6 +33,7 @@ require ( github.com/imdario/mergo v0.3.8 // indirect github.com/jedib0t/go-pretty v4.3.0+incompatible github.com/mattn/go-sqlite3 v2.0.3+incompatible + github.com/maxymania/go-system v0.0.0-20170110133659-647cc364bf0b github.com/minio/minio-go/v7 v7.0.10 github.com/mitchellh/copystructure v1.0.0 // indirect github.com/mitchellh/mapstructure v1.4.1 @@ -50,6 +53,7 @@ require ( golang.org/x/crypto v0.0.0-20201203163018-be400aefbc4c golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d + golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 golang.org/x/sys v0.0.0-20210423082822-04245dca01da golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 google.golang.org/grpc v1.37.1 diff --git a/go.sum b/go.sum index 9038a8daf71..3444f469479 100644 --- a/go.sum +++ b/go.sum @@ -105,6 +105,8 @@ github.com/cenkalti/backoff v2.1.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QH github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/cenkalti/backoff/v3 v3.0.0/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/ceph/go-ceph v0.9.0 h1:DEGE+QRdRCKacDvhtD31OCAi3FUJVuFTDCn8VNgESAs= +github.com/ceph/go-ceph v0.9.0/go.mod h1:wd+keAOqrcsN//20VQnHBGtnBnY0KHl0PA024Ng8HfQ= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= @@ -444,6 +446,7 @@ github.com/gobuffalo/validate/v3 v3.2.0/go.mod h1:PrhDOdDHxtN8KUgMvF3TDL0r1YZXV4 github.com/gobuffalo/x v0.0.0-20181003152136-452098b06085/go.mod h1:WevpGD+5YOreDJznWevcn8NTmQEW5STSBgIkpkjzqXc= github.com/gobuffalo/x v0.0.0-20181007152206-913e47c59ca7/go.mod h1:9rDPXaB3kXdKWzMc4odGQQdG2e2DIEmANy5aSJ9yesY= github.com/gofrs/uuid v3.1.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= +github.com/gofrs/uuid v3.2.0+incompatible h1:y12jRkkFxsd7GpqdSZ+/KCs/fJbqpEXSGd4+jfEaewE= github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/gofrs/uuid/v3 v3.1.2/go.mod h1:xPwMqoocQ1L5G6pXX5BcE7N5jlzn2o19oqAKxwZW/kI= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= @@ -735,6 +738,8 @@ github.com/mattn/goveralls v0.0.6 h1:cr8Y0VMo/MnEZBjxNN/vh6G90SZ7IMb6lms1dzMoO+Y github.com/mattn/goveralls v0.0.6/go.mod h1:h8b4ow6FxSPMQHF6o2ve3qsclnffZjYTNEKmLesRwqw= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/maxymania/go-system v0.0.0-20170110133659-647cc364bf0b h1:Q53idHrTuQDDHyXaxZ6pUl0I9uyD6Z6uKFK3ocX6LzI= +github.com/maxymania/go-system v0.0.0-20170110133659-647cc364bf0b/go.mod h1:KirJrATYGbTyUwVR26xIkaipRqRcMRXBf8N5dacvGus= github.com/microcosm-cc/bluemonday v1.0.1/go.mod h1:hsXNsILzKxV+sX77C5b8FSuKF00vh2OMYv+xgHpAMF4= github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= @@ -1309,6 +1314,7 @@ golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200331124033-c3d80250170d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200420163511-1957bb5e6d1f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200501145240-bc7a7d42d5c3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200523222454-059865788121/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200602225109-6fdc65e7d980/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/pkg/storage/fs/cephfs/cephfs.go b/pkg/storage/fs/cephfs/cephfs.go new file mode 100644 index 00000000000..607d5146c53 --- /dev/null +++ b/pkg/storage/fs/cephfs/cephfs.go @@ -0,0 +1,472 @@ +// Copyright 2018-2021 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package cephfs + +import ( + "context" + "io" + "net/url" + "os" + "path/filepath" + "strconv" + "strings" + + cephfs2 "github.com/ceph/go-ceph/cephfs" + "github.com/ceph/go-ceph/cephfs/admin" + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + "github.com/cs3org/reva/pkg/errtypes" + "github.com/cs3org/reva/pkg/storage" + "github.com/cs3org/reva/pkg/storage/fs/registry" + "github.com/cs3org/reva/pkg/storage/utils/chunking" + "github.com/mitchellh/mapstructure" + "github.com/pkg/errors" + "golang.org/x/sync/semaphore" +) + +type cephfs struct { + conf *Options + chunkHandler *chunking.ChunkHandler +} + +func init() { + registry.Register("cephfs", New) +} + +// New returns an implementation to of the storage.FS interface that talk to +// a ceph filesystem. +func New(m map[string]interface{}) (storage.FS, error) { + c := &Options{} + if err := mapstructure.Decode(m, c); err != nil { + err = errors.Wrap(err, "error decoding conf") + return nil, err + } + + adminMnt = newConn(nil).mount + if adminMnt == nil { + return nil, errors.New("cephfs: admin connection could not be established") + } + sLock = semaphore.NewWeighted(usrLimit) + + var e error + if e = newCache(); e != nil { + return nil, errors.New("cephfs: can't create rCache") + } + if fsadm, e = admin.New(); e != nil { + return nil, errors.New("cephfs: can't create fs admin connection") + } + + return &cephfs{ + conf: c, + chunkHandler: chunking.NewChunkHandler(c.Uploads), + }, nil +} + +func (fs *cephfs) GetHome(ctx context.Context) (string, error) { + if fs.conf.DisableHome { + return "", errtypes.NotSupported("cephfs: GetHome() home supported disabled") + } + + u := fs.makeUser(ctx) + + return fsadm.SubVolumePath("cephfs", "reva", u.Username) +} + +func (fs *cephfs) CreateHome(ctx context.Context) (err error) { + if fs.conf.DisableHome { + return errtypes.NotSupported("cephfs: GetHome() home supported disabled") + } + + u := fs.makeUser(ctx) + + err = fsadm.CreateSubVolume("cephfs", "reva", u.Username, &admin.SubVolumeOptions{ + Size: admin.ByteCount(fs.conf.UserQuotaBytes), + Uid: int(u.UidNumber), + Gid: int(u.GidNumber), + Mode: 0755, + }) + + return +} + +func (fs *cephfs) CreateDir(ctx context.Context, fn string) error { + u := fs.makeUser(ctx) + + return u.exec1(func(cv cacheVal) error { + return cv.mount.MakeDir(fn, fs.conf.DirMode) + }) +} + +func (fs *cephfs) Delete(ctx context.Context, ref *provider.Reference) (err error) { + var path string + u := fs.makeUser(ctx) + path, err = resolveRef(ref) + if err != nil { + return + } + + return u.exec1(func(cv cacheVal) error { + return cv.mount.Unlink(path) + }) +} + +func (fs *cephfs) Move(ctx context.Context, oldRef, newRef *provider.Reference) (err error) { + var oldPath, newPath string + u := fs.makeUser(ctx) + oldPath, err = resolveRef(oldRef) + if err != nil { + return + } + newPath, err = resolveRef(newRef) + if err != nil { + return + } + + return u.exec1(func(cv cacheVal) error { + return cv.mount.Rename(oldPath, newPath) + }) +} + +func (fs *cephfs) GetMD(ctx context.Context, ref *provider.Reference, mdKeys []string) (*provider.ResourceInfo, error) { + u := fs.makeUser(ctx) + path, err := resolveRef(ref) + if err != nil { + return nil, err + } + + var iri interface{} + iri, err = u.exec2(func(cv cacheVal) (i interface{}, e error) { + var stat *cephfs2.CephStatx + stat, e = cv.mount.Statx(path, cephfs2.StatxAllStats, 0) + if e != nil { + return nil, e + } + + return fileAsResourceInfo(ctx, cv.mount, path, stat, mdKeys) + }) + + return iri.(*provider.ResourceInfo), err +} + +func (fs *cephfs) ListFolder(ctx context.Context, ref *provider.Reference, mdKeys []string) (files []*provider.ResourceInfo, err error) { + var path string + u := fs.makeUser(ctx) + path, err = resolveRef(ref) + if err != nil { + return + } + + var dir *cephfs2.Directory + err = u.exec1(func(cv cacheVal) (e error) { + dir, e = cv.mount.OpenDir(path) + if e != nil { + return + } + defer dir.Close() + + var entry *cephfs2.DirEntryPlus + var ri *provider.ResourceInfo + for entry, e = dir.ReadDirPlus(cephfs2.StatxAllStats, 0); entry != nil && e == nil; entry, e = dir.ReadDirPlus(cephfs2.StatxAllStats, 0) { + if entry.Name() == "." || entry.Name() == ".." { + continue + } //TODO: Maybe include? + + ri, err = fileAsResourceInfo(ctx, cv.mount, filepath.Join(path, entry.Name()), entry.Statx(), mdKeys) + if ri == nil || err != nil { + continue + } + + files = append(files, ri) + } + + return + }) + + return +} + +func (fs *cephfs) Download(ctx context.Context, ref *provider.Reference) (io.ReadCloser, error) { + u := fs.makeUser(ctx) + path, err := resolveRef(ref) + if err != nil { + return nil, errors.Wrap(err, "cephfs: error resolving ref") + } + + if strings.HasPrefix(path, fs.conf.ShareFolder) { + return nil, errtypes.PermissionDenied("cephfs: cannot download under the virtual share folder") + } + + ir, err := u.exec2(func(cv cacheVal) (interface{}, error) { + return cv.mount.Open(path, os.O_RDONLY, defaultFilePerm) + }) + + return ir.(*cephfs2.File), err +} + +func (fs *cephfs) ListRevisions(ctx context.Context, ref *provider.Reference) (fvs []*provider.FileVersion, err error) { + var path string + u := fs.makeUser(ctx) + path, err = resolveRef(ref) + if err != nil { + return nil, errors.Wrap(err, "cephfs: error resolving ref") + } + + if strings.HasPrefix(path, fs.conf.ShareFolder) { + return nil, errtypes.PermissionDenied("cephfs: cannot list revisions under the virtual share folder") + } + + err = u.exec1(func(cv cacheVal) error { + dir, e := cv.mount.OpenDir(".snap") + if e != nil { + return e + } + + for d, _ := dir.ReadDir(); d != nil; d, _ = dir.ReadDir() { + if !strings.HasPrefix(d.Name(), "_") { + continue + } + version := strings.Trim(strings.TrimSuffix(d.Name(), cv.homeIno), "_") + + item, e := cv.mount.Statx(filepath.Join(d.Name(), path), cephfs2.StatxAllStats, 0) + if e != nil { + return e + } + fvs = append(fvs, &provider.FileVersion{ + Key: version, + Size: item.Size, + Mtime: uint64(item.Ctime.Sec), + }) + } + + return nil + }) + + return +} + +func (fs *cephfs) DownloadRevision(ctx context.Context, ref *provider.Reference, key string) (file io.ReadCloser, err error) { + var path string + u := fs.makeUser(ctx) + path, err = resolveRef(ref) + if err != nil { + return nil, errors.Wrap(err, "cephfs: error resolving ref") + } + + ifile, err := u.exec2(func(cv cacheVal) (interface{}, error) { + return cv.mount.Open(getRevisionPath(key, cv.homeIno, path), os.O_RDONLY, defaultFilePerm) + }) + + return ifile.(*cephfs2.File), err +} + +func (fs *cephfs) RestoreRevision(ctx context.Context, ref *provider.Reference, key string) (err error) { + var path string + u := fs.makeUser(ctx) + path, err = resolveRef(ref) + if err != nil { + return errors.Wrap(err, "cephfs: error resolving ref") + } + + return u.exec1(func(cv cacheVal) error { + src, e := cv.mount.Open(getRevisionPath(key, cv.homeIno, path), os.O_RDONLY, defaultFilePerm) + if e != nil { + return e + } + + dst, e := cv.mount.Open(path, os.O_WRONLY|os.O_TRUNC, defaultFilePerm) + if e != nil { + return e + } + + _, e = io.Copy(dst, src) + return e + }) +} + +func (fs *cephfs) ListRecycle(ctx context.Context) ([]*provider.RecycleItem, error) { + return nil, nil +} + +func (fs *cephfs) RestoreRecycleItem(ctx context.Context, key string, restorePath string) error { + panic("implement me") +} + +func (fs *cephfs) PurgeRecycleItem(ctx context.Context, key string) error { + return errors.New("cephfs: Recycled items can't be purged, they are handled by snapshots, which are read-only") +} + +func (fs *cephfs) EmptyRecycle(ctx context.Context) error { + return errors.New("cephfs: recycle is based on snapshots and can't be edited") +} + +func (fs *cephfs) GetPathByID(ctx context.Context, id *provider.ResourceId) (string, error) { + return "", errors.New("cephfs: file ids not implemented") +} + +func (fs *cephfs) AddGrant(ctx context.Context, ref *provider.Reference, g *provider.Grant) (err error) { + var path string + u := fs.makeUser(ctx) + path, err = resolveRef(ref) + if err != nil { + return + } + + err = u.exec1(func(cv cacheVal) (e error) { + return changePerms(cv.mount, g, path, updateGrant) + }) + + return +} + +func (fs *cephfs) RemoveGrant(ctx context.Context, ref *provider.Reference, g *provider.Grant) (err error) { + var path string + u := fs.makeUser(ctx) + path, err = resolveRef(ref) + if err != nil { + return + } + + err = u.exec1(func(cv cacheVal) (e error) { + return changePerms(cv.mount, g, path, removeGrant) + }) + + return +} + +func (fs *cephfs) UpdateGrant(ctx context.Context, ref *provider.Reference, g *provider.Grant) (err error) { + var path string + u := fs.makeUser(ctx) + path, err = resolveRef(ref) + if err != nil { + return + } + + err = u.exec1(func(cv cacheVal) (e error) { + return changePerms(cv.mount, g, path, updateGrant) + }) + + return +} + +func (fs *cephfs) ListGrants(ctx context.Context, ref *provider.Reference) (glist []*provider.Grant, err error) { + var path string + u := fs.makeUser(ctx) + if path, err = resolveRef(ref); err != nil { + return + } + + err = u.exec1(func(cv cacheVal) error { + glist = getFullPermissionSet(cv.mount, path) + + if glist == nil { + return errors.New("cephfs: error listing grants on " + path) + } + + return nil + }) + + return +} + +func (fs *cephfs) GetQuota(ctx context.Context) (total uint64, used uint64, err error) { + u := fs.makeUser(ctx) + + err = u.exec1(func(cv cacheVal) error { + buf, err := cv.mount.GetXattr("/", "ceph.quota.max_bytes") + if err != nil { + total = 0 + } else { + total, _ = strconv.ParseUint(string(buf), 10, 64) + } + + buf, err = cv.mount.GetXattr("/", "ceph.dir.rbytes") + if err == nil { + used, err = strconv.ParseUint(string(buf), 10, 64) + } + + return err + }) + + return +} + +func (fs *cephfs) CreateReference(ctx context.Context, path string, targetURI *url.URL) (err error) { + if !strings.HasPrefix(path, fs.conf.ShareFolder) { + return errors.New("cephfs: can't create reference outside a share folder") + } + u := fs.makeUser(ctx) + + err = u.exec1(func(cv cacheVal) error { + return cv.mount.MakeDir(path, fs.conf.DirMode) + }) + if err != nil { + return + } + + err = u.exec1(func(cv cacheVal) error { + return cv.mount.SetXattr(path, "reva.stat.ref", []byte(targetURI.String()), 0) + }) + + return +} + +func (fs *cephfs) Shutdown(ctx context.Context) (err error) { + if adminMnt != nil { + _ = adminMnt.Release() + } + clearCache() + + return +} + +func (fs *cephfs) SetArbitraryMetadata(ctx context.Context, ref *provider.Reference, md *provider.ArbitraryMetadata) error { + + u := fs.makeUser(ctx) + path, err := resolveRef(ref) + if err != nil { + return err + } + + return u.exec1(func(cv cacheVal) error { + for k, v := range md.Metadata { + if e := cv.mount.SetXattr(path, k, []byte(v), 0); e != nil { + return errors.Wrap(err, e.Error()) + } + } + + return nil + }) +} + +func (fs *cephfs) UnsetArbitraryMetadata(ctx context.Context, ref *provider.Reference, keys []string) error { + u := fs.makeUser(ctx) + path, err := resolveRef(ref) + if err != nil { + return err + } + + return u.exec1(func(cv cacheVal) error { + for _, key := range keys { + if e := cv.mount.RemoveXattr(path, key); e != nil { + return errors.Wrap(err, e.Error()) + } + } + + return nil + }) +} diff --git a/pkg/storage/fs/cephfs/connections.go b/pkg/storage/fs/cephfs/connections.go new file mode 100644 index 00000000000..efda8cddecc --- /dev/null +++ b/pkg/storage/fs/cephfs/connections.go @@ -0,0 +1,108 @@ +// Copyright 2018-2021 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package cephfs + +import ( + "context" + "strconv" + + cephfs2 "github.com/ceph/go-ceph/cephfs" + "github.com/ceph/go-ceph/cephfs/admin" + "github.com/dgraph-io/ristretto" + "golang.org/x/sync/semaphore" +) + +var rCache *ristretto.Cache +var sLock *semaphore.Weighted +var lctx = context.Background() + +type cacheVal struct { + perm *cephfs2.UserPerm + mount *cephfs2.MountInfo + homeIno string +} + +// Mount type +type Mount = *cephfs2.MountInfo + +var adminMnt Mount +var fsadm *admin.FSAdmin + +var usrLimit int64 = 1e4 + +func newCache() (err error) { + rCache, err = ristretto.NewCache(&ristretto.Config{ + NumCounters: 1e7, + MaxCost: usrLimit, + BufferItems: 64, + OnEvict: func(key, conflict uint64, value interface{}, cost int64) { + v := value.(cacheVal) + v.perm.Destroy() + _ = v.mount.Release() + }, + }) + + return +} + +func clearCache() { + rCache.Clear() + rCache.Close() +} + +func finishConn(mt Mount) *cacheVal { + _ = mt.Release() + return nil +} + +func newConn(user *User) *cacheVal { + var homePath = "/" + var perm *cephfs2.UserPerm + var stat *cephfs2.CephStatx + mount, err := cephfs2.CreateMount() + if err != nil { + return finishConn(mount) + } + if err = mount.ReadDefaultConfigFile(); err != nil { + return finishConn(mount) + } + if err = mount.Init(); err != nil { + return finishConn(mount) + } + + if user != nil { + if homePath, err = fsadm.SubVolumePath("cephfs", "reva", user.Username); err != nil { + return finishConn(mount) + } + perm = cephfs2.NewUserPerm(int(user.UidNumber), int(user.GidNumber), []int{}) + if err = mount.SetMountPerms(perm); err != nil { + perm.Destroy() + return finishConn(mount) + } + } + if err = mount.MountWithRoot(homePath); err != nil { + return finishConn(mount) + } + stat, err = mount.Statx(homePath, cephfs2.StatxIno, 0) + if err != nil { + return finishConn(mount) + } + + return &cacheVal{perm: perm, mount: mount, homeIno: strconv.FormatUint(uint64(stat.Inode), 10)} +} diff --git a/pkg/storage/fs/cephfs/options.go b/pkg/storage/fs/cephfs/options.go new file mode 100644 index 00000000000..6a1119a9799 --- /dev/null +++ b/pkg/storage/fs/cephfs/options.go @@ -0,0 +1,34 @@ +// Copyright 2018-2021 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package cephfs + +// Options for the cephfs module +type Options struct { + Root string `mapstructure:"root"` + ShareFolder string `mapstructure:"share_folder"` + Uploads string `mapstructure:"uploads"` + DataDirectory string `mapstructure:"data_directory"` + RecycleBin string `mapstructure:"recycle_bin"` + Versions string `mapstructure:"versions"` + Shadow string `mapstructure:"shadow"` + References string `mapstructure:"references"` + DirMode uint32 `mapstructure:"dirmode"` + DisableHome bool `mapstructure:"disable_home"` + UserQuotaBytes uint64 `mapstructure:"user_quota_bytes"` +} diff --git a/pkg/storage/fs/cephfs/permissions.go b/pkg/storage/fs/cephfs/permissions.go new file mode 100644 index 00000000000..42de1db758e --- /dev/null +++ b/pkg/storage/fs/cephfs/permissions.go @@ -0,0 +1,233 @@ +// Copyright 2018-2021 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package cephfs + +import ( + "context" + "errors" + "reflect" + "strconv" + "strings" + + cephfs2 "github.com/ceph/go-ceph/cephfs" + grouppb "github.com/cs3org/go-cs3apis/cs3/identity/group/v1beta1" + userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + "github.com/cs3org/reva/pkg/user" + "github.com/maxymania/go-system/posix_acl" +) + +var perms = map[rune][]int{ + 'r': {3, 4, 5, 7, 8, 9, 10, 16}, + 'w': {1, 2, 6, 11, 13, 14, 15}, + 'x': {0, 12, 17}, +} + +var pIndex = make(map[int]rune) + +func init() { + for k, arr := range perms { + for _, v := range arr { + pIndex[v] = k + } + } +} + +const ( + aclXattr = "system.posix_acl_access" +) + +func getPermissionSet(ctx context.Context, stat *cephfs2.CephStatx, mount Mount, path string) (perm *provider.ResourcePermissions) { + u := user.ContextMustGetUser(ctx) + + if int64(stat.Uid) == u.UidNumber || int64(stat.Gid) == u.GidNumber { + updatePerms(perm, "rwx", false) + return + } + + acls := &posix_acl.Acl{} + var xattr []byte + var err error + if xattr, err = mount.GetXattr(path, aclXattr); err != nil { + return + } + acls.Decode(xattr) + + for _, acl := range acls.List { + rwx := strings.Split(acl.String(), ":")[2] + switch acl.GetType() { + case posix_acl.ACL_USER: + if int64(acl.GetID()) == u.UidNumber { + updatePerms(perm, rwx, false) + } + case posix_acl.ACL_GROUP: + if int64(acl.GetID()) == u.GidNumber { + updatePerms(perm, rwx, false) + } + case posix_acl.ACL_MASK: + updatePerms(perm, rwx, true) + case posix_acl.ACL_OTHERS: + updatePerms(perm, rwx, false) + } + } + + return +} + +func getFullPermissionSet(mount Mount, path string) (permList []*provider.Grant) { + acls := &posix_acl.Acl{} + var xattr []byte + var err error + if xattr, err = mount.GetXattr(path, aclXattr); err != nil { + return + } + acls.Decode(xattr) + + permMap := make(map[uint32]*provider.Grant) + for _, acl := range acls.List { + rwx := strings.Split(acl.String(), ":")[2] + switch acl.GetType() { + case posix_acl.ACL_USER: + permMap[acl.GetID()] = &provider.Grant{ + Grantee: &provider.Grantee{ + Type: provider.GranteeType_GRANTEE_TYPE_USER, + Id: &provider.Grantee_UserId{UserId: &userpb.UserId{Idp: strconv.Itoa(int(acl.GetID()))}}, + }, + Permissions: &provider.ResourcePermissions{}, + } + updatePerms(permMap[acl.GetID()].Permissions, rwx, false) + case posix_acl.ACL_GROUP: + permMap[acl.GetID()] = &provider.Grant{ + Grantee: &provider.Grantee{ + Type: provider.GranteeType_GRANTEE_TYPE_GROUP, + Id: &provider.Grantee_GroupId{GroupId: &grouppb.GroupId{Idp: strconv.Itoa(int(acl.GetID()))}}, + }, + Permissions: &provider.ResourcePermissions{}, + } + updatePerms(permMap[acl.GetID()].Permissions, rwx, false) + } + } + + for _, value := range permMap { + permList = append(permList, value) + } + + return +} + +func permToInt(p *provider.ResourcePermissions) (result uint16) { + item := reflect.ValueOf(p).Elem() + fields := item.NumField() - 3 + rwx := uint16(4 | 2 | 1) + for i := 0; i < fields; i++ { + if item.Field(i).Bool() { + switch pIndex[i] { + case 'r': + result |= 4 + case 'w': + result |= 2 + case 'x': + result |= 1 + } + } + + if result == rwx { + return + } + } + + return +} + +const ( + updateGrant = iota + removeGrant +) + +func changePerms(mt Mount, grant *provider.Grant, path string, method int) (e error) { + buf, e := mt.GetXattr(path, aclXattr) + if e != nil { + return + } + acls := &posix_acl.Acl{} + acls.Decode(buf) + var id uint64 + var sid posix_acl.AclSID + + switch grant.Grantee.Type { + case provider.GranteeType_GRANTEE_TYPE_USER: + id, e = strconv.ParseUint(grant.Grantee.GetUserId().Idp, 10, 32) + if e != nil { + return e + } + sid.SetUid(uint32(id)) + case provider.GranteeType_GRANTEE_TYPE_GROUP: + id, e = strconv.ParseUint(grant.Grantee.GetGroupId().Idp, 10, 32) + if e != nil { + return e + } + sid.SetGid(uint32(id)) + default: + return errors.New("cephfs: invalid grantee type") + } + + var found = false + var i int + for i = range acls.List { + if acls.List[i].AclSID == sid { + found = true + } + } + + if method == updateGrant { + if found { + acls.List[i].Perm |= permToInt(grant.Permissions) + } else { + acls.List = append(acls.List, posix_acl.AclElement{ + AclSID: sid, + Perm: permToInt(grant.Permissions), + }) + } + } else { + if found { + acls.List[i].Perm &^= permToInt(grant.Permissions) + if acls.List[i].Perm == 0 { // remove empty grant + acls.List = append(acls.List[:i], acls.List[i+1:]...) + } + } + } + + e = mt.SetXattr(path, aclXattr, acls.Encode(), 0) + + return +} + +func updatePerms(rp *provider.ResourcePermissions, acl string, unset bool) { + for _, t := range "rwx" { + if strings.ContainsRune(acl, t) { + for _, i := range perms[t] { + reflect.ValueOf(rp).Elem().Field(i).SetBool(true) + } + } else if unset { + for _, i := range perms[t] { + reflect.ValueOf(rp).Elem().Field(i).SetBool(false) + } + } + } +} diff --git a/pkg/storage/fs/cephfs/upload.go b/pkg/storage/fs/cephfs/upload.go new file mode 100644 index 00000000000..e06229b96e1 --- /dev/null +++ b/pkg/storage/fs/cephfs/upload.go @@ -0,0 +1,409 @@ +// Copyright 2018-2021 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package cephfs + +import ( + "bytes" + "context" + "encoding/json" + "io" + "io/ioutil" + "os" + "path/filepath" + + cephfs2 "github.com/ceph/go-ceph/cephfs" + + userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + "github.com/cs3org/reva/pkg/appctx" + "github.com/cs3org/reva/pkg/errtypes" + "github.com/cs3org/reva/pkg/storage/utils/chunking" + "github.com/cs3org/reva/pkg/user" + "github.com/google/uuid" + "github.com/pkg/errors" + tusd "github.com/tus/tusd/pkg/handler" +) + +var defaultFilePerm = uint32(0664) + +func closeFile(file *cephfs2.File) { + _ = file.Close() +} + +func (fs *cephfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser) error { + u := fs.makeUser(ctx) + upload, err := fs.GetUpload(ctx, ref.GetPath()) + if err != nil { + metadata := map[string]string{"sizedeferred": "true"} + uploadIDs, err := fs.InitiateUpload(ctx, ref, 0, metadata) + if err != nil { + return err + } + if upload, err = fs.GetUpload(ctx, uploadIDs["simple"]); err != nil { + return errors.Wrap(err, "cephfs: error retrieving upload") + } + } + + uploadInfo := upload.(*fileUpload) + + p := uploadInfo.info.Storage["InternalDestination"] + ok, err := chunking.IsChunked(p) + if err != nil { + return errors.Wrap(err, "cephfs: error checking path") + } + if ok { + var assembledFile string + p, assembledFile, err = fs.chunkHandler.WriteChunk(p, r) + if err != nil { + return err + } + if p == "" { + if err = uploadInfo.Terminate(ctx); err != nil { + return errors.Wrap(err, "cephfs: error removing auxiliary files") + } + return errtypes.PartialContent(ref.String()) + } + uploadInfo.info.Storage["InternalDestination"] = p + + fd, err := u.exec2(func(cv cacheVal) (interface{}, error) { + return cv.mount.Open(assembledFile, os.O_RDONLY, defaultFilePerm) + }) + if err != nil { + return errors.Wrap(err, "cephfs: error opening assembled file") + } + defer closeFile(fd.(*cephfs2.File)) + defer u.exec0(func(cv cacheVal) { + _ = cv.mount.Unlink(assembledFile) + }) + r = fd.(*cephfs2.File) + } + + if _, err := uploadInfo.WriteChunk(ctx, 0, r); err != nil { + return errors.Wrap(err, "cephfs: error writing to binary file") + } + + return uploadInfo.FinishUpload(ctx) +} + +func (fs *cephfs) InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64, metadata map[string]string) (map[string]string, error) { + np, err := resolveRef(ref) + if err != nil { + return nil, errors.Wrap(err, "cephfs: error resolving reference") + } + + info := tusd.FileInfo{ + MetaData: tusd.MetaData{ + "filename": filepath.Base(np), + "dir": filepath.Dir(np), + }, + Size: uploadLength, + } + + if metadata != nil { + if metadata["mtime"] != "" { + info.MetaData["mtime"] = metadata["mtime"] + } + if _, ok := metadata["sizedeferred"]; ok { + info.SizeIsDeferred = true + } + } + + upload, err := fs.NewUpload(ctx, info) + if err != nil { + return nil, err + } + + info, _ = upload.GetInfo(ctx) + + return map[string]string{ + "simple": info.ID, + "tus": info.ID, + }, nil +} + +// UseIn tells the tus upload middleware which extensions it supports. +func (fs *cephfs) UseIn(composer *tusd.StoreComposer) { + composer.UseCore(fs) + composer.UseTerminater(fs) +} + +func (fs *cephfs) NewUpload(ctx context.Context, info tusd.FileInfo) (upload tusd.Upload, err error) { + log := appctx.GetLogger(ctx) + log.Debug().Interface("info", info).Msg("cephfs: NewUpload") + + usr := fs.makeUser(ctx) + + fn := info.MetaData["filename"] + if fn == "" { + return nil, errors.New("cephfs: missing filename in metadata") + } + info.MetaData["filename"] = filepath.Clean(info.MetaData["filename"]) + + dir := info.MetaData["dir"] + if dir == "" { + return nil, errors.New("cephfs: missing dir in metadata") + } + info.MetaData["dir"] = filepath.Clean(info.MetaData["dir"]) + + np := filepath.Join(info.MetaData["dir"], info.MetaData["filename"]) + + log.Debug().Interface("info", info).Msg("cephfs: resolved filename") + + info.ID = uuid.New().String() + + binPath, err := fs.getUploadPath(ctx, info.ID) + if err != nil { + return nil, errors.Wrap(err, "cephfs: error resolving upload path") + } + + info.Storage = map[string]string{ + "Type": "LocalStore", + "BinPath": binPath, + "InternalDestination": np, + + "Idp": usr.Id.Idp, + "UserId": usr.Id.OpaqueId, + "UserName": usr.Username, + + "LogLevel": log.GetLevel().String(), + } + // Create binary file with no content + file, err := usr.exec2(func(cv cacheVal) (interface{}, error) { + return cv.mount.Open(binPath, os.O_CREATE|os.O_WRONLY, defaultFilePerm) + }) + if err != nil { + return nil, err + } + defer closeFile(file.(*cephfs2.File)) + + u := &fileUpload{ + info: info, + binPath: binPath, + infoPath: binPath + ".info", + fs: fs, + ctx: ctx, + } + + if !info.SizeIsDeferred && info.Size == 0 { + log.Debug().Interface("info", info).Msg("cephfs: finishing upload for empty file") + // no need to create info file and finish directly + err := u.FinishUpload(ctx) + if err != nil { + return nil, err + } + return u, nil + } + + // writeInfo creates the file by itself if necessary + err = u.writeInfo() + if err != nil { + return nil, err + } + + return u, nil +} + +func (fs *cephfs) getUploadPath(ctx context.Context, uploadID string) (string, error) { + return filepath.Join(fs.conf.Uploads, uploadID), nil +} + +// GetUpload returns the Upload for the given upload id +func (fs *cephfs) GetUpload(ctx context.Context, id string) (tusd.Upload, error) { + usr := fs.makeUser(ctx) + binPath, err := fs.getUploadPath(ctx, id) + if err != nil { + return nil, err + } + infoPath := binPath + ".info" + info := tusd.FileInfo{} + data, err := ioutil.ReadFile(infoPath) + if err != nil { + return nil, err + } + if err := json.Unmarshal(data, &info); err != nil { + return nil, err + } + + stat, err := usr.exec2(func(cv cacheVal) (interface{}, error) { + return cv.mount.Statx(binPath, cephfs2.StatxSize, 0) + }) + if err != nil { + return nil, err + } + + info.Offset = int64(stat.(*cephfs2.CephStatx).Size) + + u := &userpb.User{ + Id: &userpb.UserId{ + Idp: info.Storage["Idp"], + OpaqueId: info.Storage["UserId"], + }, + Username: info.Storage["UserName"], + } + + ctx = user.ContextSetUser(ctx, u) + + return &fileUpload{ + info: info, + binPath: binPath, + infoPath: infoPath, + fs: fs, + ctx: ctx, + }, nil +} + +type fileUpload struct { + // info stores the current information about the upload + info tusd.FileInfo + // infoPath is the path to the .info file + infoPath string + // binPath is the path to the binary file (which has no extension) + binPath string + // only fs knows how to handle metadata and versions + fs *cephfs + // a context with a user + ctx context.Context +} + +// GetInfo returns the FileInfo +func (upload *fileUpload) GetInfo(ctx context.Context) (tusd.FileInfo, error) { + return upload.info, nil +} + +// GetReader returns an io.Reader for the upload +func (upload *fileUpload) GetReader(ctx context.Context) (io.Reader, error) { + return os.Open(upload.binPath) +} + +// WriteChunk writes the stream from the reader to the given offset of the upload +func (upload *fileUpload) WriteChunk(ctx context.Context, offset int64, src io.Reader) (int64, error) { + usr := upload.fs.makeUser(upload.ctx) + file, err := usr.exec2(func(cv cacheVal) (interface{}, error) { + return cv.mount.Open(upload.binPath, os.O_WRONLY|os.O_APPEND, defaultFilePerm) + }) + if err != nil { + return 0, err + } + defer closeFile(file.(*cephfs2.File)) + + n, err := io.Copy(file.(*cephfs2.File), src) + + // If the HTTP PATCH request gets interrupted in the middle (e.g. because + // the user wants to pause the upload), Go's net/http returns an io.ErrUnexpectedEOF. + // However, for OwnCloudStore it's not important whether the stream has ended + // on purpose or accidentally. + if err != nil { + if err != io.ErrUnexpectedEOF { + return n, err + } + } + + upload.info.Offset += n + err = upload.writeInfo() + + return n, err +} + +// writeInfo updates the entire information. Everything will be overwritten. +func (upload *fileUpload) writeInfo() error { + data, err := json.Marshal(upload.info) + if err != nil { + return err + } + u := upload.fs.makeUser(upload.ctx) + return u.exec1(func(cv cacheVal) (e error) { + var file *cephfs2.File + if file, e = cv.mount.Open(upload.infoPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, defaultFilePerm); e != nil { + return + } + defer file.Close() + + if _, e = io.Copy(file, bytes.NewReader(data)); e != nil { + return + } + return cv.mount.SyncFs() + }) +} + +// FinishUpload finishes an upload and moves the file to the internal destination +func (upload *fileUpload) FinishUpload(ctx context.Context) error { + + np := upload.info.Storage["InternalDestination"] + + // TODO check etag with If-Match header + // if destination exists + // if _, err := os.Stat(np); err == nil { + // the local storage does not store metadata + // the fileid is based on the path, so no we do not need to copy it to the new file + // the local storage does not track revisions + // } + + // if destination exists + // if _, err := os.Stat(np); err == nil { + // create revision + // if err := upload.fs.archiveRevision(upload.ctx, np); err != nil { + // return err + // } + // } + + u := upload.fs.makeUser(upload.ctx) + + err := u.exec1(func(cv cacheVal) error { + return cv.mount.Rename(upload.binPath, np) + }) + if err != nil { + return err + } + + // only delete the upload if it was successfully written to the fs + err = u.exec1(func(cv cacheVal) error { + return cv.mount.Unlink(upload.infoPath) + }) + if err != nil { + if !os.IsNotExist(err) { + log := appctx.GetLogger(ctx) + log.Err(err).Interface("info", upload.info).Msg("cephfs: could not delete upload info") + } + } + + // TODO: set mtime if specified in metadata + + return err +} + +// To implement the termination extension as specified in https://tus.io/protocols/resumable-upload.html#termination +// - the storage needs to implement AsTerminatableUpload +// - the upload needs to implement Terminate + +// AsTerminatableUpload returns a a TerminatableUpload +func (fs *cephfs) AsTerminatableUpload(upload tusd.Upload) tusd.TerminatableUpload { + return upload.(*fileUpload) +} + +// Terminate terminates the upload +func (upload *fileUpload) Terminate(ctx context.Context) error { + u := upload.fs.makeUser(upload.ctx) + + return u.exec1(func(cv cacheVal) error { + if e := cv.mount.Unlink(upload.infoPath); e != nil { + return e + } + return cv.mount.Unlink(upload.binPath) + }) +} diff --git a/pkg/storage/fs/cephfs/user.go b/pkg/storage/fs/cephfs/user.go new file mode 100644 index 00000000000..159435a2ff1 --- /dev/null +++ b/pkg/storage/fs/cephfs/user.go @@ -0,0 +1,217 @@ +// Copyright 2018-2021 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package cephfs + +import ( + "context" + "os" + "path/filepath" + "strconv" + + cephfs2 "github.com/ceph/go-ceph/cephfs" + userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + typesv1beta1 "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" + "github.com/cs3org/reva/pkg/mime" + "github.com/cs3org/reva/pkg/user" + "github.com/pkg/errors" +) + +type callBack0 func(cb cacheVal) +type callBack func(cb cacheVal) error +type callBack2 func(cb cacheVal) (interface{}, error) + +// User custom type to add functionality to current struct +type User struct { + *userv1beta1.User +} + +func (fs *cephfs) makeUser(ctx context.Context) *User { + u := user.ContextMustGetUser(ctx) + + return &User{u} +} + +func (user *User) exec0(cb callBack0) { + if err := sLock.Acquire(lctx, 1); err != nil { + return + } + defer sLock.Release(1) + + val, found := rCache.Get(user) + if !found { + val = newConn(user) + // if val != nil { + rCache.Set(user, val, 1) + // } else { + // return + // } + } + + cb(val.(cacheVal)) +} + +func (user *User) exec1(cb callBack) error { + if err := sLock.Acquire(lctx, 1); err != nil { + return err + } + defer sLock.Release(1) + + val, found := rCache.Get(user) + if !found { + val = newConn(user) + // if val != nil { + rCache.Set(user, val, 1) + // } else { + // return errors.New("cephfs: Can't initiate client connection") + // } + } + + return cb(val.(cacheVal)) +} + +func (user *User) exec2(cb callBack2) (interface{}, error) { + if err := sLock.Acquire(lctx, 1); err != nil { + return nil, err + } + defer sLock.Release(1) + + val, found := rCache.Get(user) + if !found { + val = newConn(user) + // if val != nil { + rCache.Set(user, val, 1) + // } else { + // return nil, errors.New("cephfs: Can't initiate client connection") + // } + } + + return cb(val.(cacheVal)) +} + +func resolveRef(ref *provider.Reference) (str string, err error) { + if str = ref.GetPath(); str == "" { + return "", errors.New("cephfs: file ids not yet supported") + } + + return +} + +func fileAsResourceInfo(ctx context.Context, mt Mount, path string, stat *cephfs2.CephStatx, mdKeys []string) (ri *provider.ResourceInfo, err error) { + var ( + _type provider.ResourceType + target string + size uint64 + oid, idp, buf []byte + isDir = false + ) + + switch os.FileMode(stat.Mode) & os.ModeType { + case os.ModeDir: + _type = provider.ResourceType_RESOURCE_TYPE_CONTAINER + if buf, err = mt.GetXattr(path, "ceph.dir.rbytes"); err == nil { + size, err = strconv.ParseUint(string(buf), 10, 64) + } + isDir = true + case os.ModeSymlink: + _type = provider.ResourceType_RESOURCE_TYPE_SYMLINK + target, err = mt.Readlink(path) + case 0: + _type = provider.ResourceType_RESOURCE_TYPE_FILE + size = stat.Size + default: + return nil, errors.New("cephfs: unknown entry type") + } + if err != nil { + return + } + + if oid, err = mt.GetXattr(path, "reva.owner.oid"); err != nil { + return + } + if idp, err = mt.GetXattr(path, "reva.owner.idp"); err != nil { + return + } + uid := userv1beta1.UserId{ + Idp: string(idp), + OpaqueId: string(oid), + } + + var xattrs []string + keys := make(map[string]bool, len(mdKeys)) + for _, key := range mdKeys { + keys[key] = true + } + if keys["*"] { + mdKeys = []string{} + keys = map[string]bool{} + } + mx := make(map[string]string) + if xattrs, err = mt.ListXattr(path); err == nil { + for _, xattr := range xattrs { + if len(mdKeys) == 0 || keys[xattr] { + if buf, err := mt.GetXattr(path, xattr); err == nil { + mx[xattr] = string(buf) + } + } + } + } + + fid, ok := mx["reva.stat.fid"] + if !ok { + return nil, errors.New("cephfs: file has no fid") + } + id := &provider.ResourceId{OpaqueId: fid} + + var etag string + if isDir { + rctime, _ := mt.GetXattr(path, "ceph.dir.rctime") + etag = strconv.FormatUint(uint64(stat.Inode), 10) + ":" + string(rctime) + } else { + etag = strconv.FormatUint(uint64(stat.Inode), 10) + ":" + strconv.FormatInt(stat.Ctime.Sec, 10) + } + + mtime := &typesv1beta1.Timestamp{ + Seconds: uint64(stat.Mtime.Sec), + Nanos: uint32(stat.Mtime.Nsec), + } + + perms := getPermissionSet(ctx, stat, mt, path) + + ri = &provider.ResourceInfo{ + Type: _type, + Id: id, + Checksum: nil, + Etag: etag, + MimeType: mime.Detect(isDir, path), + Mtime: mtime, + Path: path, + PermissionSet: perms, + Size: size, + Owner: &uid, + Target: target, + ArbitraryMetadata: &provider.ArbitraryMetadata{Metadata: mx}, + } + + return +} + +func getRevisionPath(revKey string, homeIno string, path string) string { + return filepath.Join(".snap", "_"+revKey+"_"+homeIno, path) +} diff --git a/pkg/storage/fs/loader/loader.go b/pkg/storage/fs/loader/loader.go index dd08dc63540..70c015b7430 100644 --- a/pkg/storage/fs/loader/loader.go +++ b/pkg/storage/fs/loader/loader.go @@ -20,6 +20,7 @@ package loader import ( // Load core storage filesystem backends. + _ "github.com/cs3org/reva/pkg/storage/fs/cephfs" _ "github.com/cs3org/reva/pkg/storage/fs/eos" _ "github.com/cs3org/reva/pkg/storage/fs/eosgrpc" _ "github.com/cs3org/reva/pkg/storage/fs/eosgrpchome"