diff --git a/.dockerignore b/.dockerignore index 429a7e9d469..ab22e55d143 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1,7 +1,6 @@ .github changelog docs -examples grpc-tests tests tools diff --git a/.drone.star b/.drone.star index 3c60499d7df..92460ad3e3a 100644 --- a/.drone.star +++ b/.drone.star @@ -229,6 +229,26 @@ def buildAndPublishDocker(): ], }, }, + { + "name": "publish-docker-revad-ceph-latest", + "pull": "always", + "image": "plugins/docker", + "settings": { + "repo": "cs3org/revad", + "tags": "latest-ceph", + "dockerfile": "Dockerfile.revad-ceph", + "username": { + "from_secret": "dockerhub_username", + }, + "password": { + "from_secret": "dockerhub_password", + }, + "custom_dns": [ + "128.142.17.5", + "128.142.16.5", + ], + }, + }, ], } @@ -481,6 +501,26 @@ def release(): ], }, }, + { + "name": "docker-revad-ceph-tag", + "pull": "always", + "image": "plugins/docker", + "settings": { + "repo": "cs3org/revad", + "tags": "${DRONE_TAG}-ceph", + "dockerfile": "Dockerfile.revad-ceph", + "username": { + "from_secret": "dockerhub_username", + }, + "password": { + "from_secret": "dockerhub_password", + }, + "custom_dns": [ + "128.142.17.5", + "128.142.16.5", + ], + }, + }, ], "depends_on": ["changelog"], } diff --git a/Dockerfile.revad-ceph b/Dockerfile.revad-ceph new file mode 100644 index 00000000000..186acfcc63b --- /dev/null +++ b/Dockerfile.revad-ceph @@ -0,0 +1,52 @@ +# Copyright 2018-2021 CERN +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# In applying this license, CERN does not waive the privileges and immunities +# granted to it by virtue of its status as an Intergovernmental Organization +# or submit itself to any jurisdiction. + +FROM ceph/daemon-base + +RUN dnf update -y && dnf install -y \ + git \ + gcc \ + make \ + libcephfs-devel \ + librbd-devel \ + librados-devel + +ADD https://golang.org/dl/go1.16.4.linux-amd64.tar.gz \ + go1.16.4.linux-amd64.tar.gz + +RUN rm -rf /usr/local/go && \ + tar -C /usr/local -xzf go1.16.4.linux-amd64.tar.gz && \ + rm go1.16.4.linux-amd64.tar.gz + +ENV PATH /go/bin:/usr/local/go/bin:$PATH +ENV GOPATH /go + +WORKDIR /go/src/github/cs3org/reva +COPY . . +RUN mkdir -p /go/bin \ + make build-revad-cephfs-docker && \ + cp /go/src/github/cs3org/reva/cmd/revad/revad /usr/bin/revad + +RUN cp -r examples/ceph /etc/ + +RUN mkdir -p /etc/revad/ && echo "" > /etc/revad/revad.toml + +EXPOSE 9999 10000 + +ENTRYPOINT [ "/usr/bin/revad" ] +CMD [ "-c", "/etc/revad/revad.toml", "-p", "/var/run/revad.pid" ] diff --git a/Makefile b/Makefile index 465a71ef9b0..b686fca1d93 100644 --- a/Makefile +++ b/Makefile @@ -30,10 +30,9 @@ off: imports: off `go env GOPATH`/bin/goimports -w tools pkg internal cmd -build: imports test-go-version - gofmt -s -w . - go build -ldflags ${BUILD_FLAGS} -o ./cmd/revad/revad ./cmd/revad - go build -ldflags ${BUILD_FLAGS} -o ./cmd/reva/reva ./cmd/reva +build: build-revad build-reva test-go-version + +build-cephfs: build-revad-cephfs build-reva tidy: go mod tidy @@ -41,6 +40,9 @@ tidy: build-revad: imports go build -ldflags ${BUILD_FLAGS} -o ./cmd/revad/revad ./cmd/revad +build-revad-cephfs: imports + go build -ldflags ${BUILD_FLAGS} -tags ceph -o ./cmd/revad/revad ./cmd/revad + build-reva: imports go build -ldflags ${BUILD_FLAGS} -o ./cmd/reva/reva ./cmd/reva @@ -104,6 +106,8 @@ ci: build-ci test lint-ci # to be run in Docker build build-revad-docker: off go build -ldflags ${BUILD_FLAGS} -o ./cmd/revad/revad ./cmd/revad +build-revad-cephfs-docker: off + go build -ldflags ${BUILD_FLAGS} -tags ceph -o ./cmd/revad/revad ./cmd/revad build-reva-docker: off go build -ldflags ${BUILD_FLAGS} -o ./cmd/reva/reva ./cmd/reva clean: diff --git a/changelog/unreleased/cephfs-driver.md b/changelog/unreleased/cephfs-driver.md new file mode 100644 index 00000000000..9015c61538f --- /dev/null +++ b/changelog/unreleased/cephfs-driver.md @@ -0,0 +1,3 @@ +Enhancement: Reva CephFS module v0.2.1 + +https://github.com/cs3org/reva/pull/1209 \ No newline at end of file diff --git a/docs/content/en/docs/config/packages/storage/fs/cephfs/_index.md b/docs/content/en/docs/config/packages/storage/fs/cephfs/_index.md new file mode 100644 index 00000000000..48dc231ebf3 --- /dev/null +++ b/docs/content/en/docs/config/packages/storage/fs/cephfs/_index.md @@ -0,0 +1,26 @@ +--- +title: "cephfs" +linkTitle: "cephfs" +weight: 10 +description: > + Configuration for the cephfs service +--- + +# _struct: config_ + +{{% dir name="root" type="string" default="/var/tmp/reva/" %}} +Path of root directory for user storage. [[Ref]](https://github.com/cs3org/reva/tree/master/pkg/storage/fs/cephfs/cephfs.go#L34) +{{< highlight toml >}} +[storage.fs.cephfs] +root = "/var/tmp/reva/" +{{< /highlight >}} +{{% /dir %}} + +{{% dir name="share_folder" type="string" default="/MyShares" %}} +Path for storing share references. [[Ref]](https://github.com/cs3org/reva/tree/master/pkg/storage/fs/cephfs/cephfs.go#L35) +{{< highlight toml >}} +[storage.fs.cephfs] +share_folder = "/MyShares" +{{< /highlight >}} +{{% /dir %}} + diff --git a/examples/ceph/ceph.conf b/examples/ceph/ceph.conf new file mode 100644 index 00000000000..f2daaa8de56 --- /dev/null +++ b/examples/ceph/ceph.conf @@ -0,0 +1,3 @@ +[global] + fsid = '8aaa35c4-75dc-42e5-a812-cbc1cdfd3323' + mon_host = '[v2:188.184.96.178:3300/0,v1:188.184.96.178:6789/0] [v2:188.185.88.76:3300/0,v1:188.185.88.76:6789/0] [v2:188.185.126.6:3300/0,v1:188.185.126.6:6789/0]' diff --git a/examples/ceph/keyring b/examples/ceph/keyring new file mode 100644 index 00000000000..9e555cc1b2f --- /dev/null +++ b/examples/ceph/keyring @@ -0,0 +1,2 @@ +[client.admin] + key = 'AQAu88Fg5iekGhAAeVP0Td05PuybytuRJgBRqA==' diff --git a/go.mod b/go.mod index 11802ea317e..53b98d812b2 100644 --- a/go.mod +++ b/go.mod @@ -13,11 +13,13 @@ require ( github.com/beevik/etree v1.1.0 github.com/bluele/gcache v0.0.2 github.com/c-bata/go-prompt v0.2.5 + github.com/ceph/go-ceph v0.12.0 github.com/cheggaaa/pb v1.0.29 github.com/coreos/go-oidc v2.2.1+incompatible github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e github.com/cs3org/go-cs3apis v0.0.0-20211214102047-7ce3134d7bf8 github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8 + github.com/dgraph-io/ristretto v0.1.0 github.com/eventials/go-tus v0.0.0-20200718001131-45c7ec8f5d59 github.com/gdexlab/go-render v1.0.1 github.com/go-chi/chi/v5 v5.0.7 @@ -40,6 +42,7 @@ require ( github.com/jedib0t/go-pretty v4.3.0+incompatible github.com/juliangruber/go-intersect v1.1.0 github.com/mattn/go-sqlite3 v2.0.3+incompatible + github.com/maxymania/go-system v0.0.0-20170110133659-647cc364bf0b github.com/mileusna/useragent v1.0.2 github.com/minio/minio-go/v7 v7.0.20 github.com/mitchellh/copystructure v1.2.0 // indirect @@ -71,6 +74,7 @@ require ( go.opentelemetry.io/otel/trace v1.3.0 golang.org/x/crypto v0.0.0-20211215165025-cf75a172585e golang.org/x/oauth2 v0.0.0-20211005180243-6b3c2da341f1 + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359 golang.org/x/term v0.0.0-20210916214954-140adaaadfaf google.golang.org/genproto v0.0.0-20211021150943-2b146023228c diff --git a/go.sum b/go.sum index bf2b6d812ce..f5fcc5c13ba 100644 --- a/go.sum +++ b/go.sum @@ -98,6 +98,7 @@ github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d h1:Byv0BzEl github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d/go.mod h1:WaHUgvxTVq04UNunO+XhnAqY/wQc+bxr74GqbsZ/Jqw= github.com/aws/aws-sdk-go v1.20.1/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/aws/aws-sdk-go v1.34.28/go.mod h1:H7NKnBqNVzoTJpGfLrQkkD+ytBA93eiDYi/+8rV9s48= +github.com/aws/aws-sdk-go v1.35.24/go.mod h1:tlPOdRjfxPBpNIwqDj61rmsnA85v9jc0Ps9+muhnW+k= github.com/aws/aws-sdk-go v1.38.35/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro= github.com/aws/aws-sdk-go v1.40.11/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q= github.com/aws/aws-sdk-go v1.41.13/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q= @@ -120,6 +121,8 @@ github.com/c-bata/go-prompt v0.2.5 h1:3zg6PecEywxNn0xiqcXHD96fkbxghD+gdB2tbsYfl+ github.com/c-bata/go-prompt v0.2.5/go.mod h1:vFnjEGDIIA/Lib7giyE4E9c50Lvl8j0S+7FVlAwDAVw= github.com/cenkalti/backoff/v4 v4.1.1/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/ceph/go-ceph v0.12.0 h1:nlFgKQZXOFR4oMnzXsKwTr79Y6EYDwqTrpigICGy/Tw= +github.com/ceph/go-ceph v0.12.0/go.mod h1:mafFpf5Vg8Ai8Bd+FAMvKBHLmtdpTXdRP/TNq8XWegY= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= @@ -151,6 +154,10 @@ github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8/go.mod h1:4a github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgraph-io/ristretto v0.1.0 h1:Jv3CGQHp9OjuMBSne1485aDpUkTKEcUqF+jm/LuerPI= +github.com/dgraph-io/ristretto v0.1.0/go.mod h1:fux0lOrBhrVCJd3lcTHsIJhq1T2rokOu6v9Vcb3Q9ug= +github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA= +github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/docker/go-units v0.3.3/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= @@ -320,6 +327,8 @@ github.com/gobuffalo/packr/v2 v2.0.9/go.mod h1:emmyGweYTm6Kdper+iywB6YK5YzuKchGt github.com/gobuffalo/packr/v2 v2.2.0/go.mod h1:CaAwI0GPIAv+5wKLtv8Afwl+Cm78K/I/VCm/3ptBN+0= github.com/gobuffalo/syncx v0.0.0-20190224160051-33c29581e754/go.mod h1:HhnNqWY95UYwwW3uSASeV7vtgYkT2t16hJgV3AEPUpw= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= +github.com/gofrs/uuid v4.0.0+incompatible h1:1SD/1F5pU8p29ybwgQSwpQk+mwdRrXCYuPhW6m+TnJw= github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= @@ -530,6 +539,8 @@ github.com/mattn/go-tty v0.0.3 h1:5OfyWorkyO7xP52Mq7tB36ajHDG5OHrmBGIS/DtakQI= github.com/mattn/go-tty v0.0.3/go.mod h1:ihxohKRERHTVzN+aSVRwACLCeqIoZAWpoICkkvrWyR0= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/maxymania/go-system v0.0.0-20170110133659-647cc364bf0b h1:Q53idHrTuQDDHyXaxZ6pUl0I9uyD6Z6uKFK3ocX6LzI= +github.com/maxymania/go-system v0.0.0-20170110133659-647cc364bf0b/go.mod h1:KirJrATYGbTyUwVR26xIkaipRqRcMRXBf8N5dacvGus= github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso= github.com/mileusna/useragent v1.0.2 h1:DgVKtiPnjxlb73z9bCwgdUvU2nQNQ97uhgfO8l9uz/w= github.com/mileusna/useragent v1.0.2/go.mod h1:3d8TOmwL/5I8pJjyVDteHtgDGcefrFUX4ccGOMKNYYc= @@ -935,6 +946,7 @@ golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200331124033-c3d80250170d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200501052902-10377860bb8e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200501145240-bc7a7d42d5c3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200511232937-7e40ca221e25/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200515095857-1151b9dac4a9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200523222454-059865788121/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/pkg/storage/fs/cephfs/cephfs.go b/pkg/storage/fs/cephfs/cephfs.go new file mode 100644 index 00000000000..e71db2df755 --- /dev/null +++ b/pkg/storage/fs/cephfs/cephfs.go @@ -0,0 +1,591 @@ +// Copyright 2018-2021 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +//go:build ceph +// +build ceph + +package cephfs + +import ( + "context" + "fmt" + "io" + "net/url" + "os" + "path/filepath" + "strconv" + "strings" + + cephfs2 "github.com/ceph/go-ceph/cephfs" + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + "github.com/cs3org/reva/pkg/appctx" + "github.com/cs3org/reva/pkg/errtypes" + "github.com/cs3org/reva/pkg/storage" + "github.com/cs3org/reva/pkg/storage/fs/registry" + "github.com/mitchellh/mapstructure" + "github.com/pkg/errors" +) + +const ( + xattrTrustedNs = "trusted." + xattrEID = xattrTrustedNs + "eid" + xattrMd5 = xattrTrustedNs + "checksum" + xattrMd5ts = xattrTrustedNs + "checksumTS" + xattrRef = xattrTrustedNs + "ref" + xattrUserNs = "user." + snap = ".snap" +) + +type cephfs struct { + conf *Options + conn *connections + adminConn *adminConn + chunkHandler *ChunkHandler +} + +func init() { + registry.Register("cephfs", New) +} + +// New returns an implementation to of the storage.FS interface that talk to +// a ceph filesystem. +func New(m map[string]interface{}) (fs storage.FS, err error) { + c := &Options{} + if err = mapstructure.Decode(m, c); err != nil { + return nil, errors.Wrap(err, "error decoding conf") + } + + c.fillDefaults() + + var cache *connections + if cache, err = newCache(); err != nil { + return nil, errors.New("cephfs: can't create caches") + } + + adminConn := newAdminConn(c.IndexPool) + if adminConn == nil { + return nil, errors.Wrap(err, "cephfs: Couldn't create admin connections") + } + + for _, dir := range []string{c.ShadowFolder, c.UploadFolder} { + err = adminConn.adminMount.MakeDir(dir, dirPermFull) + if err != nil && err.Error() != errFileExists { + return nil, errors.New("cephfs: can't initialise system dir " + dir + ":" + err.Error()) + } + } + + return &cephfs{ + conf: c, + conn: cache, + adminConn: adminConn, + }, nil +} + +func (fs *cephfs) GetHome(ctx context.Context) (string, error) { + if fs.conf.DisableHome { + return "", errtypes.NotSupported("cephfs: GetHome() home supported disabled") + } + + user := fs.makeUser(ctx) + + return user.home, nil +} + +func (fs *cephfs) CreateHome(ctx context.Context) (err error) { + if fs.conf.DisableHome { + return errtypes.NotSupported("cephfs: GetHome() home supported disabled") + } + + user := fs.makeUser(ctx) + + // Stop createhome from running the whole thing because it is called multiple times + if _, err = fs.adminConn.adminMount.Statx(user.home, cephfs2.StatxMode, 0); err == nil { + return + } + + err = walkPath(user.home, func(path string) error { + return fs.adminConn.adminMount.MakeDir(path, dirPermDefault) + }, false) + if err != nil { + return getRevaError(err) + } + + err = fs.adminConn.adminMount.Chown(user.home, uint32(user.UidNumber), uint32(user.GidNumber)) + if err != nil { + return getRevaError(err) + } + + err = fs.adminConn.adminMount.SetXattr(user.home, "ceph.quota.max_bytes", []byte(fmt.Sprint(fs.conf.UserQuotaBytes)), 0) + if err != nil { + return getRevaError(err) + } + + user.op(func(cv *cacheVal) { + err = cv.mount.MakeDir(removeLeadingSlash(fs.conf.ShareFolder), dirPermDefault) + if err != nil && err.Error() == errFileExists { + err = nil + } + }) + + return getRevaError(err) +} + +func (fs *cephfs) CreateDir(ctx context.Context, ref *provider.Reference) error { + user := fs.makeUser(ctx) + path, err := user.resolveRef(ref) + if err != nil { + return getRevaError(err) + } + + user.op(func(cv *cacheVal) { + if err = cv.mount.MakeDir(path, dirPermDefault); err != nil { + return + } + + //TODO(tmourati): Add entry id logic + }) + + return getRevaError(err) +} + +func (fs *cephfs) Delete(ctx context.Context, ref *provider.Reference) (err error) { + var path string + user := fs.makeUser(ctx) + path, err = user.resolveRef(ref) + if err != nil { + return err + } + + user.op(func(cv *cacheVal) { + if err = cv.mount.Unlink(path); err != nil && err.Error() == errIsADirectory { + err = cv.mount.RemoveDir(path) + } + + //TODO(tmourati): Add entry id logic + }) + + //has already been deleted by direct mount + if err != nil && err.Error() == errNotFound { + return nil + } + + return getRevaError(err) +} + +func (fs *cephfs) Move(ctx context.Context, oldRef, newRef *provider.Reference) (err error) { + var oldPath, newPath string + user := fs.makeUser(ctx) + if oldPath, err = user.resolveRef(oldRef); err != nil { + return + } + if newPath, err = user.resolveRef(newRef); err != nil { + return + } + + user.op(func(cv *cacheVal) { + if err = cv.mount.Rename(oldPath, newPath); err != nil { + return + } + + //TODO(tmourati): Add entry id logic, handle already moved file error + }) + + // has already been moved by direct mount + if err != nil && err.Error() == errNotFound { + return nil + } + + return getRevaError(err) +} + +func (fs *cephfs) GetMD(ctx context.Context, ref *provider.Reference, mdKeys []string) (ri *provider.ResourceInfo, err error) { + var path string + user := fs.makeUser(ctx) + + if path, err = user.resolveRef(ref); err != nil { + return nil, err + } + + user.op(func(cv *cacheVal) { + var stat Statx + if stat, err = cv.mount.Statx(path, cephfs2.StatxBasicStats, 0); err != nil { + return + } + ri, err = user.fileAsResourceInfo(cv, path, stat, mdKeys) + }) + + return ri, getRevaError(err) +} + +func (fs *cephfs) ListFolder(ctx context.Context, ref *provider.Reference, mdKeys []string) (files []*provider.ResourceInfo, err error) { + var path string + user := fs.makeUser(ctx) + if path, err = user.resolveRef(ref); err != nil { + return + } + + user.op(func(cv *cacheVal) { + var dir *cephfs2.Directory + if dir, err = cv.mount.OpenDir(path); err != nil { + return + } + defer closeDir(dir) + + var entry *cephfs2.DirEntryPlus + var ri *provider.ResourceInfo + for entry, err = dir.ReadDirPlus(cephfs2.StatxBasicStats, 0); entry != nil && err == nil; entry, err = dir.ReadDirPlus(cephfs2.StatxBasicStats, 0) { + if fs.conf.HiddenDirs[entry.Name()] { + continue + } + + ri, err = user.fileAsResourceInfo(cv, filepath.Join(path, entry.Name()), entry.Statx(), mdKeys) + if ri == nil || err != nil { + if err != nil { + log := appctx.GetLogger(ctx) + log.Err(err).Msg("cephfs: error in file as resource info") + } + err = nil + continue + } + + files = append(files, ri) + } + }) + + return files, getRevaError(err) +} + +func (fs *cephfs) Download(ctx context.Context, ref *provider.Reference) (rc io.ReadCloser, err error) { + var path string + user := fs.makeUser(ctx) + if path, err = user.resolveRef(ref); err != nil { + return nil, errors.Wrap(err, "cephfs: error resolving ref") + } + + user.op(func(cv *cacheVal) { + if strings.HasPrefix(strings.TrimPrefix(path, user.home), fs.conf.ShareFolder) { + err = errtypes.PermissionDenied("cephfs: cannot download under the virtual share folder") + return + } + rc, err = cv.mount.Open(path, os.O_RDONLY, 0) + }) + + return rc, getRevaError(err) +} + +func (fs *cephfs) ListRevisions(ctx context.Context, ref *provider.Reference) (fvs []*provider.FileVersion, err error) { + //TODO(tmourati): Fix entry id logic + var path string + user := fs.makeUser(ctx) + if path, err = user.resolveRef(ref); err != nil { + return nil, errors.Wrap(err, "cephfs: error resolving ref") + } + + user.op(func(cv *cacheVal) { + if strings.HasPrefix(path, removeLeadingSlash(fs.conf.ShareFolder)) { + err = errtypes.PermissionDenied("cephfs: cannot download under the virtual share folder") + return + } + var dir *cephfs2.Directory + if dir, err = cv.mount.OpenDir(".snap"); err != nil { + return + } + defer closeDir(dir) + + for d, _ := dir.ReadDir(); d != nil; d, _ = dir.ReadDir() { + var revPath string + var stat Statx + var e error + + if strings.HasPrefix(d.Name(), ".") { + continue + } + + revPath, e = resolveRevRef(cv.mount, ref, d.Name()) + if e != nil { + continue + } + stat, e = cv.mount.Statx(revPath, cephfs2.StatxMtime|cephfs2.StatxSize, 0) + if e != nil { + continue + } + fvs = append(fvs, &provider.FileVersion{ + Key: d.Name(), + Size: stat.Size, + Mtime: uint64(stat.Mtime.Sec), + }) + } + }) + + return fvs, getRevaError(err) +} + +func (fs *cephfs) DownloadRevision(ctx context.Context, ref *provider.Reference, key string) (file io.ReadCloser, err error) { + //TODO(tmourati): Fix entry id logic + user := fs.makeUser(ctx) + + user.op(func(cv *cacheVal) { + var revPath string + revPath, err = resolveRevRef(cv.mount, ref, key) + if err != nil { + return + } + + file, err = cv.mount.Open(revPath, os.O_RDONLY, 0) + }) + + return file, getRevaError(err) +} + +func (fs *cephfs) RestoreRevision(ctx context.Context, ref *provider.Reference, key string) (err error) { + //TODO(tmourati): Fix entry id logic + var path string + user := fs.makeUser(ctx) + if path, err = user.resolveRef(ref); err != nil { + return errors.Wrap(err, "cephfs: error resolving ref") + } + + user.op(func(cv *cacheVal) { + var revPath string + if revPath, err = resolveRevRef(cv.mount, ref, key); err != nil { + err = errors.Wrap(err, "cephfs: error resolving revision ref "+ref.String()) + return + } + + var src, dst *cephfs2.File + if src, err = cv.mount.Open(revPath, os.O_RDONLY, 0); err != nil { + return + } + defer closeFile(src) + + if dst, err = cv.mount.Open(path, os.O_WRONLY|os.O_TRUNC, 0); err != nil { + return + } + defer closeFile(dst) + + _, err = io.Copy(dst, src) + }) + + return getRevaError(err) +} + +func (fs *cephfs) GetPathByID(ctx context.Context, id *provider.ResourceId) (str string, err error) { + //TODO(tmourati): Add entry id logic + return "", errtypes.NotSupported("cephfs: entry IDs currently not supported") +} + +func (fs *cephfs) AddGrant(ctx context.Context, ref *provider.Reference, g *provider.Grant) (err error) { + var path string + user := fs.makeUser(ctx) + if path, err = user.resolveRef(ref); err != nil { + return + } + + user.op(func(cv *cacheVal) { + err = fs.changePerms(ctx, cv.mount, g, path, updateGrant) + }) + + return getRevaError(err) +} + +func (fs *cephfs) RemoveGrant(ctx context.Context, ref *provider.Reference, g *provider.Grant) (err error) { + var path string + user := fs.makeUser(ctx) + if path, err = user.resolveRef(ref); err != nil { + return + } + + user.op(func(cv *cacheVal) { + err = fs.changePerms(ctx, cv.mount, g, path, removeGrant) + }) + + return getRevaError(err) +} + +func (fs *cephfs) UpdateGrant(ctx context.Context, ref *provider.Reference, g *provider.Grant) (err error) { + var path string + user := fs.makeUser(ctx) + if path, err = user.resolveRef(ref); err != nil { + return + } + + user.op(func(cv *cacheVal) { + err = fs.changePerms(ctx, cv.mount, g, path, updateGrant) + }) + + return getRevaError(err) +} + +func (fs *cephfs) DenyGrant(ctx context.Context, ref *provider.Reference, g *provider.Grantee) (err error) { + var path string + user := fs.makeUser(ctx) + if path, err = user.resolveRef(ref); err != nil { + return + } + + user.op(func(cv *cacheVal) { + grant := &provider.Grant{Grantee: g} //nil perms will remove the whole grant + err = fs.changePerms(ctx, cv.mount, grant, path, removeGrant) + }) + + return getRevaError(err) +} + +func (fs *cephfs) ListGrants(ctx context.Context, ref *provider.Reference) (glist []*provider.Grant, err error) { + var path string + user := fs.makeUser(ctx) + if path, err = user.resolveRef(ref); err != nil { + return + } + + user.op(func(cv *cacheVal) { + glist = fs.getFullPermissionSet(ctx, cv.mount, path) + + if glist == nil { + err = errors.New("cephfs: error listing grants on " + path) + } + }) + + return glist, getRevaError(err) +} + +func (fs *cephfs) GetQuota(ctx context.Context, ref *provider.Reference) (total uint64, used uint64, err error) { + user := fs.makeUser(ctx) + + log := appctx.GetLogger(ctx) + user.op(func(cv *cacheVal) { + var buf []byte + buf, err = cv.mount.GetXattr(".", "ceph.quota.max_bytes") + if err != nil { + log.Warn().Msg("cephfs: user quota bytes not set") + total = fs.conf.UserQuotaBytes + } else { + total, _ = strconv.ParseUint(string(buf), 10, 64) + } + + buf, err = cv.mount.GetXattr(".", "ceph.dir.rbytes") + if err == nil { + used, err = strconv.ParseUint(string(buf), 10, 64) + } + }) + + return total, used, getRevaError(err) +} + +func (fs *cephfs) CreateReference(ctx context.Context, path string, targetURI *url.URL) (err error) { + user := fs.makeUser(ctx) + + user.op(func(cv *cacheVal) { + if !strings.HasPrefix(strings.TrimPrefix(path, user.home), fs.conf.ShareFolder) { + err = errors.New("cephfs: can't create reference outside a share folder") + } else { + err = cv.mount.MakeDir(path, dirPermDefault) + } + }) + if err != nil { + return getRevaError(err) + } + + user.op(func(cv *cacheVal) { + err = cv.mount.SetXattr(path, xattrRef, []byte(targetURI.String()), 0) + }) + + return getRevaError(err) +} + +func (fs *cephfs) Shutdown(ctx context.Context) (err error) { + ctx.Done() + fs.conn.clearCache() + _ = fs.adminConn.adminMount.Unmount() + _ = fs.adminConn.adminMount.Release() + fs.adminConn.radosConn.Shutdown() + + return +} + +func (fs *cephfs) SetArbitraryMetadata(ctx context.Context, ref *provider.Reference, md *provider.ArbitraryMetadata) (err error) { + var path string + user := fs.makeUser(ctx) + if path, err = user.resolveRef(ref); err != nil { + return err + } + + user.op(func(cv *cacheVal) { + for k, v := range md.Metadata { + if !strings.HasPrefix(k, xattrUserNs) { + k = xattrUserNs + k + } + if e := cv.mount.SetXattr(path, k, []byte(v), 0); e != nil { + err = errors.Wrap(err, e.Error()) + return + } + } + }) + + return getRevaError(err) +} + +func (fs *cephfs) UnsetArbitraryMetadata(ctx context.Context, ref *provider.Reference, keys []string) (err error) { + var path string + user := fs.makeUser(ctx) + if path, err = user.resolveRef(ref); err != nil { + return err + } + + user.op(func(cv *cacheVal) { + for _, key := range keys { + if !strings.HasPrefix(key, xattrUserNs) { + key = xattrUserNs + key + } + if e := cv.mount.RemoveXattr(path, key); e != nil { + err = errors.Wrap(err, e.Error()) + return + } + } + }) + + return getRevaError(err) +} + +func (fs *cephfs) EmptyRecycle(ctx context.Context) error { + return errtypes.NotSupported("cephfs: empty recycle not supported") +} + +func (fs *cephfs) CreateStorageSpace(ctx context.Context, req *provider.CreateStorageSpaceRequest) (r *provider.CreateStorageSpaceResponse, err error) { + return nil, errors.New("cephfs: createStorageSpace not supported") +} + +func (fs *cephfs) ListRecycle(ctx context.Context, basePath, key, relativePath string) ([]*provider.RecycleItem, error) { + panic("implement me") +} + +func (fs *cephfs) RestoreRecycleItem(ctx context.Context, basePath, key, relativePath string, restoreRef *provider.Reference) error { + return errors.New("cephfs: restoreRecycleItem not supported") +} + +func (fs *cephfs) PurgeRecycleItem(ctx context.Context, basePath, key, relativePath string) error { + return errors.New("cephfs: purgeRecycleItem not supported") +} + +func (fs *cephfs) ListStorageSpaces(ctx context.Context, filter []*provider.ListStorageSpacesRequest_Filter, permissions map[string]struct{}) ([]*provider.StorageSpace, error) { + return nil, errors.New("cephfs: listStorageSpaces not supported") +} + +func (fs *cephfs) UpdateStorageSpace(ctx context.Context, req *provider.UpdateStorageSpaceRequest) (*provider.UpdateStorageSpaceResponse, error) { + return nil, errors.New("cephfs: updateStorageSpace not supported") +} diff --git a/pkg/storage/fs/cephfs/chunking.go b/pkg/storage/fs/cephfs/chunking.go new file mode 100644 index 00000000000..bb4e48fe2c7 --- /dev/null +++ b/pkg/storage/fs/cephfs/chunking.go @@ -0,0 +1,344 @@ +// Copyright 2018-2021 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +//go:build ceph +// +build ceph + +package cephfs + +import ( + "context" + "fmt" + "io" + "os" + "path/filepath" + "regexp" + "strconv" + "strings" + "time" + + cephfs2 "github.com/ceph/go-ceph/cephfs" + "github.com/google/uuid" +) + +// IsChunked checks if a given path refers to a chunk or not +func IsChunked(fn string) (bool, error) { + // FIXME: also need to check whether the OC-Chunked header is set + return regexp.MatchString(`-chunking-\w+-[0-9]+-[0-9]+$`, fn) +} + +// ChunkBLOBInfo stores info about a particular chunk +type ChunkBLOBInfo struct { + Path string + TransferID string + TotalChunks int + CurrentChunk int +} + +// Not using the resource path in the chunk folder name allows uploading to +// the same folder after a move without having to restart the chunk upload +func (c *ChunkBLOBInfo) uploadID() string { + return fmt.Sprintf("chunking-%s-%d", c.TransferID, c.TotalChunks) +} + +// GetChunkBLOBInfo decodes a chunk name to retrieve info about it. +func GetChunkBLOBInfo(path string) (*ChunkBLOBInfo, error) { + parts := strings.Split(path, "-chunking-") + tail := strings.Split(parts[1], "-") + + totalChunks, err := strconv.Atoi(tail[1]) + if err != nil { + return nil, err + } + + currentChunk, err := strconv.Atoi(tail[2]) + if err != nil { + return nil, err + } + if currentChunk >= totalChunks { + return nil, fmt.Errorf("current chunk:%d exceeds total number of chunks:%d", currentChunk, totalChunks) + } + + return &ChunkBLOBInfo{ + Path: parts[0], + TransferID: tail[0], + TotalChunks: totalChunks, + CurrentChunk: currentChunk, + }, nil +} + +// ChunkHandler manages chunked uploads, storing the chunks in a temporary directory +// until it gets the final chunk which is then returned. +type ChunkHandler struct { + user *User + chunkFolder string +} + +// NewChunkHandler creates a handler for chunked uploads. +func NewChunkHandler(ctx context.Context, fs *cephfs) *ChunkHandler { + return &ChunkHandler{fs.makeUser(ctx), fs.conf.UploadFolder} +} + +func (c *ChunkHandler) getChunkTempFileName() string { + return fmt.Sprintf("__%d_%s", time.Now().Unix(), uuid.New().String()) +} + +func (c *ChunkHandler) getChunkFolderName(i *ChunkBLOBInfo) (path string, err error) { + path = filepath.Join(c.chunkFolder, i.uploadID()) + c.user.op(func(cv *cacheVal) { + err = cv.mount.MakeDir(path, 0777) + }) + + return +} + +func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (finish bool, chunk string, err error) { + var chunkInfo *ChunkBLOBInfo + + chunkInfo, err = GetChunkBLOBInfo(path) + if err != nil { + err = fmt.Errorf("error getting chunk info from path: %s", path) + return + } + + chunkTempFilename := c.getChunkTempFileName() + c.user.op(func(cv *cacheVal) { + var tmpFile *cephfs2.File + target := filepath.Join(c.chunkFolder, chunkTempFilename) + tmpFile, err = cv.mount.Open(target, os.O_CREATE|os.O_WRONLY, filePermDefault) + defer closeFile(tmpFile) + if err != nil { + return + } + _, err = io.Copy(tmpFile, r) + }) + if err != nil { + return + } + + chunksFolderName, err := c.getChunkFolderName(chunkInfo) + if err != nil { + return + } + // c.logger.Info().Log("chunkfolder", chunksFolderName) + + chunkTarget := filepath.Join(chunksFolderName, strconv.Itoa(chunkInfo.CurrentChunk)) + c.user.op(func(cv *cacheVal) { + err = cv.mount.Rename(chunkTempFilename, chunkTarget) + }) + if err != nil { + return + } + + // Check that all chunks are uploaded. + // This is very inefficient, the server has to check that it has all the + // chunks after each uploaded chunk. + // A two-phase upload like DropBox is better, because the server will + // assembly the chunks when the client asks for it. + numEntries := 0 + c.user.op(func(cv *cacheVal) { + var dir *cephfs2.Directory + var entry *cephfs2.DirEntry + var chunkFile, assembledFile *cephfs2.File + + dir, err = cv.mount.OpenDir(chunksFolderName) + defer closeDir(dir) + + for entry, err = dir.ReadDir(); entry != nil && err == nil; entry, err = dir.ReadDir() { + numEntries++ + } + // to remove . and .. + numEntries -= 2 + + if err != nil || numEntries < chunkInfo.TotalChunks { + return + } + + chunk = filepath.Join(c.chunkFolder, c.getChunkTempFileName()) + assembledFile, err = cv.mount.Open(chunk, os.O_CREATE|os.O_WRONLY, filePermDefault) + defer closeFile(assembledFile) + defer deleteFile(cv.mount, chunk) + if err != nil { + return + } + + for i := 0; i < numEntries; i++ { + target := filepath.Join(chunksFolderName, strconv.Itoa(i)) + + chunkFile, err = cv.mount.Open(target, os.O_RDONLY, 0) + if err != nil { + return + } + _, err = io.Copy(assembledFile, chunkFile) + closeFile(chunkFile) + if err != nil { + return + } + } + + // necessary approach in case assembly fails + for i := 0; i < numEntries; i++ { + target := filepath.Join(chunksFolderName, strconv.Itoa(i)) + err = cv.mount.Unlink(target) + if err != nil { + return + } + } + _ = cv.mount.Unlink(chunksFolderName) + }) + + return true, chunk, nil +} + +// WriteChunk saves an intermediate chunk temporarily and assembles all chunks +// once the final one is received. +func (c *ChunkHandler) WriteChunk(fn string, r io.ReadCloser) (string, string, error) { + finish, chunk, err := c.saveChunk(fn, r) + if err != nil { + return "", "", err + } + + if !finish { + return "", "", nil + } + + chunkInfo, err := GetChunkBLOBInfo(fn) + if err != nil { + return "", "", err + } + + return chunkInfo.Path, chunk, nil + + // TODO(labkode): implement old chunking + + /* + req2 := &provider.StartWriteSessionRequest{} + res2, err := client.StartWriteSession(ctx, req2) + if err != nil { + logger.Error(ctx, err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + if res2.Status.Code != rpc.Code_CODE_OK { + logger.Println(ctx, res2.Status) + w.WriteHeader(http.StatusInternalServerError) + return + } + + sessID := res2.SessionId + logger.Build().Str("sessID", sessID).Msg(ctx, "got write session id") + + stream, err := client.Write(ctx) + if err != nil { + logger.Error(ctx, err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + buffer := make([]byte, 1024*1024*3) + var offset uint64 + var numChunks uint64 + + for { + n, err := fd.Read(buffer) + if n > 0 { + req := &provider.WriteRequest{Data: buffer, Length: uint64(n), SessionId: sessID, Offset: offset} + err = stream.Send(req) + if err != nil { + logger.Error(ctx, err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + numChunks++ + offset += uint64(n) + } + + if err == io.EOF { + break + } + + if err != nil { + logger.Error(ctx, err) + w.WriteHeader(http.StatusInternalServerError) + return + } + } + + res3, err := stream.CloseAndRecv() + if err != nil { + logger.Error(ctx, err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + if res3.Status.Code != rpc.Code_CODE_OK { + logger.Println(ctx, err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + req4 := &provider.FinishWriteSessionRequest{Filename: chunkInfo.path, SessionId: sessID} + res4, err := client.FinishWriteSession(ctx, req4) + if err != nil { + logger.Error(ctx, err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + if res4.Status.Code != rpc.Code_CODE_OK { + logger.Println(ctx, res4.Status) + w.WriteHeader(http.StatusInternalServerError) + return + } + + req.Filename = chunkInfo.path + res, err = client.Stat(ctx, req) + if err != nil { + logger.Error(ctx, err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + if res.Status.Code != rpc.Code_CODE_OK { + logger.Println(ctx, res.Status) + w.WriteHeader(http.StatusInternalServerError) + return + } + + md2 := res.Metadata + + w.Header().Add("Content-Type", md2.Mime) + w.Header().Set("ETag", md2.Etag) + w.Header().Set("OC-FileId", md2.Id) + w.Header().Set("OC-ETag", md2.Etag) + t := time.Unix(int64(md2.Mtime), 0) + lastModifiedString := t.Format(time.RFC1123Z) + w.Header().Set("Last-Modified", lastModifiedString) + w.Header().Set("X-OC-MTime", "accepted") + + if md == nil { + w.WriteHeader(http.StatusCreated) + return + } + + w.WriteHeader(http.StatusNoContent) + return + */ +} diff --git a/pkg/storage/fs/cephfs/connections.go b/pkg/storage/fs/cephfs/connections.go new file mode 100644 index 00000000000..7b928eaa633 --- /dev/null +++ b/pkg/storage/fs/cephfs/connections.go @@ -0,0 +1,315 @@ +// Copyright 2018-2021 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +//go:build ceph +// +build ceph + +package cephfs + +import ( + "context" + "fmt" + "time" + + "github.com/ceph/go-ceph/cephfs/admin" + rados2 "github.com/ceph/go-ceph/rados" + grouppb "github.com/cs3org/go-cs3apis/cs3/identity/group/v1beta1" + userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" + rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" + "github.com/cs3org/reva/pkg/rgrpc/todo/pool" + "github.com/pkg/errors" + + cephfs2 "github.com/ceph/go-ceph/cephfs" + "github.com/dgraph-io/ristretto" + "golang.org/x/sync/semaphore" +) + +type cacheVal struct { + perm *cephfs2.UserPerm + mount *cephfs2.MountInfo +} + +//TODO: Add to cephfs obj + +type connections struct { + cache *ristretto.Cache + lock *semaphore.Weighted + ctx context.Context + userCache *ristretto.Cache + groupCache *ristretto.Cache +} + +//TODO: make configurable/add to options +var usrLimit int64 = 1e4 + +func newCache() (c *connections, err error) { + cache, err := ristretto.NewCache(&ristretto.Config{ + NumCounters: 1e7, + MaxCost: usrLimit, + BufferItems: 64, + OnEvict: func(item *ristretto.Item) { + v := item.Value.(cacheVal) + v.perm.Destroy() + _ = v.mount.Unmount() + _ = v.mount.Release() + }, + }) + if err != nil { + return + } + + ucache, err := ristretto.NewCache(&ristretto.Config{ + NumCounters: 1e7, + MaxCost: 10 * usrLimit, + BufferItems: 64, + }) + if err != nil { + return + } + + gcache, err := ristretto.NewCache(&ristretto.Config{ + NumCounters: 1e7, + MaxCost: 10 * usrLimit, + BufferItems: 64, + }) + if err != nil { + return + } + + c = &connections{ + cache: cache, + lock: semaphore.NewWeighted(usrLimit), + ctx: context.Background(), + userCache: ucache, + groupCache: gcache, + } + + return +} + +func (c *connections) clearCache() { + c.cache.Clear() + c.cache.Close() +} + +type adminConn struct { + indexPoolName string + subvolAdmin *admin.FSAdmin + adminMount Mount + radosConn *rados2.Conn + radosIO *rados2.IOContext +} + +func newAdminConn(poolName string) *adminConn { + rados, err := rados2.NewConn() + if err != nil { + return nil + } + if err = rados.ReadDefaultConfigFile(); err != nil { + return nil + } + + if err = rados.Connect(); err != nil { + return nil + } + + pools, err := rados.ListPools() + if err != nil { + rados.Shutdown() + return nil + } + + var radosIO *rados2.IOContext + if in(poolName, pools) { + radosIO, err = rados.OpenIOContext(poolName) + if err != nil { + rados.Shutdown() + return nil + } + } else { + err = rados.MakePool(poolName) + if err != nil { + rados.Shutdown() + return nil + } + radosIO, err = rados.OpenIOContext(poolName) + if err != nil { + rados.Shutdown() + return nil + } + } + + mount, err := cephfs2.CreateFromRados(rados) + if err != nil { + rados.Shutdown() + return nil + } + + if err = mount.Mount(); err != nil { + rados.Shutdown() + destroyCephConn(mount, nil) + return nil + } + + return &adminConn{ + poolName, + admin.NewFromConn(rados), + mount, + rados, + radosIO, + } +} + +func newConn(user *User) *cacheVal { + var perm *cephfs2.UserPerm + mount, err := cephfs2.CreateMount() + if err != nil { + return destroyCephConn(mount, perm) + } + if err = mount.ReadDefaultConfigFile(); err != nil { + return destroyCephConn(mount, perm) + } + if err = mount.Init(); err != nil { + return destroyCephConn(mount, perm) + } + + if user != nil { //nil creates admin conn + perm = cephfs2.NewUserPerm(int(user.UidNumber), int(user.GidNumber), []int{}) + if err = mount.SetMountPerms(perm); err != nil { + return destroyCephConn(mount, perm) + } + } + + if err = mount.MountWithRoot("/"); err != nil { + return destroyCephConn(mount, perm) + } + + if user != nil { + if err = mount.ChangeDir(user.fs.conf.Root); err != nil { + return destroyCephConn(mount, perm) + } + } + + return &cacheVal{ + perm: perm, + mount: mount, + } +} + +func (fs *cephfs) getUserByID(ctx context.Context, uid string) (*userpb.User, error) { + if entity, found := fs.conn.userCache.Get(uid); found { + return entity.(*userpb.User), nil + } + + client, err := pool.GetGatewayServiceClient(fs.conf.GatewaySvc) + if err != nil { + return nil, errors.Wrap(err, "cephfs: error getting gateway grpc client") + } + getUserResp, err := client.GetUserByClaim(ctx, &userpb.GetUserByClaimRequest{ + Claim: "uid", + Value: uid, + }) + + if err != nil { + return nil, errors.Wrap(err, "cephfs: error getting user") + } + if getUserResp.Status.Code != rpc.Code_CODE_OK { + return nil, errors.Wrap(err, "cephfs: grpc get user failed") + } + fs.conn.userCache.SetWithTTL(uid, getUserResp.User, 1, 24*time.Hour) + fs.conn.userCache.SetWithTTL(getUserResp.User.Id.OpaqueId, getUserResp.User, 1, 24*time.Hour) + + return getUserResp.User, nil +} + +func (fs *cephfs) getUserByOpaqueID(ctx context.Context, oid string) (*userpb.User, error) { + if entity, found := fs.conn.userCache.Get(oid); found { + return entity.(*userpb.User), nil + } + client, err := pool.GetGatewayServiceClient(fs.conf.GatewaySvc) + if err != nil { + return nil, errors.Wrap(err, "cephfs: error getting gateway grpc client") + } + getUserResp, err := client.GetUser(ctx, &userpb.GetUserRequest{ + UserId: &userpb.UserId{ + OpaqueId: oid, + }, + }) + + if err != nil { + return nil, errors.Wrap(err, "cephfs: error getting user") + } + if getUserResp.Status.Code != rpc.Code_CODE_OK { + return nil, errors.Wrap(err, "cephfs: grpc get user failed") + } + fs.conn.userCache.SetWithTTL(fmt.Sprint(getUserResp.User.UidNumber), getUserResp.User, 1, 24*time.Hour) + fs.conn.userCache.SetWithTTL(oid, getUserResp.User, 1, 24*time.Hour) + + return getUserResp.User, nil +} + +func (fs *cephfs) getGroupByID(ctx context.Context, gid string) (*grouppb.Group, error) { + if entity, found := fs.conn.groupCache.Get(gid); found { + return entity.(*grouppb.Group), nil + } + + client, err := pool.GetGatewayServiceClient(fs.conf.GatewaySvc) + if err != nil { + return nil, errors.Wrap(err, "cephfs: error getting gateway grpc client") + } + getGroupResp, err := client.GetGroupByClaim(ctx, &grouppb.GetGroupByClaimRequest{ + Claim: "gid", + Value: gid, + }) + if err != nil { + return nil, errors.Wrap(err, "cephfs: error getting group") + } + if getGroupResp.Status.Code != rpc.Code_CODE_OK { + return nil, errors.Wrap(err, "cephfs: grpc get group failed") + } + fs.conn.groupCache.SetWithTTL(gid, getGroupResp.Group, 1, 24*time.Hour) + fs.conn.groupCache.SetWithTTL(getGroupResp.Group.Id.OpaqueId, getGroupResp.Group, 1, 24*time.Hour) + + return getGroupResp.Group, nil +} + +func (fs *cephfs) getGroupByOpaqueID(ctx context.Context, oid string) (*grouppb.Group, error) { + if entity, found := fs.conn.groupCache.Get(oid); found { + return entity.(*grouppb.Group), nil + } + client, err := pool.GetGatewayServiceClient(fs.conf.GatewaySvc) + if err != nil { + return nil, errors.Wrap(err, "cephfs: error getting gateway grpc client") + } + getGroupResp, err := client.GetGroup(ctx, &grouppb.GetGroupRequest{ + GroupId: &grouppb.GroupId{ + OpaqueId: oid, + }, + }) + + if err != nil { + return nil, errors.Wrap(err, "cephfs: error getting group") + } + if getGroupResp.Status.Code != rpc.Code_CODE_OK { + return nil, errors.Wrap(err, "cephfs: grpc get group failed") + } + fs.conn.userCache.SetWithTTL(fmt.Sprint(getGroupResp.Group.GidNumber), getGroupResp.Group, 1, 24*time.Hour) + fs.conn.userCache.SetWithTTL(oid, getGroupResp.Group, 1, 24*time.Hour) + + return getGroupResp.Group, nil +} diff --git a/pkg/storage/fs/cephfs/errors.go b/pkg/storage/fs/cephfs/errors.go new file mode 100644 index 00000000000..a4ab013c977 --- /dev/null +++ b/pkg/storage/fs/cephfs/errors.go @@ -0,0 +1,64 @@ +// Copyright 2018-2021 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +//go:build ceph +// +build ceph + +package cephfs + +/* + #include + #include + #include +*/ +import "C" +import ( + "fmt" + + "github.com/cs3org/reva/pkg/errtypes" +) + +func wrapErrorMsg(code C.int) string { + return fmt.Sprintf("cephfs: ret=-%d, %s", code, C.GoString(C.strerror(code))) +} + +var ( + errNotFound = wrapErrorMsg(C.ENOENT) + errFileExists = wrapErrorMsg(C.EEXIST) + errNoSpaceLeft = wrapErrorMsg(C.ENOSPC) + errIsADirectory = wrapErrorMsg(C.EISDIR) + errPermissionDenied = wrapErrorMsg(C.EACCES) +) + +func getRevaError(err error) error { + if err == nil { + return nil + } + switch err.Error() { + case errNotFound: + return errtypes.NotFound("cephfs: dir entry not found") + case errPermissionDenied: + return errtypes.PermissionDenied("cephfs: permission denied") + case errFileExists: + return errtypes.AlreadyExists("cephfs: file already exists") + case errNoSpaceLeft: + return errtypes.InsufficientStorage("cephfs: no space left on device") + default: + return errtypes.InternalError(err.Error()) + } +} diff --git a/pkg/storage/fs/cephfs/options.go b/pkg/storage/fs/cephfs/options.go new file mode 100644 index 00000000000..0b4b81f0e69 --- /dev/null +++ b/pkg/storage/fs/cephfs/options.go @@ -0,0 +1,90 @@ +// Copyright 2018-2021 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +//go:build ceph +// +build ceph + +package cephfs + +import ( + "path/filepath" + + "github.com/cs3org/reva/pkg/sharedconf" +) + +// Options for the cephfs module +type Options struct { + GatewaySvc string `mapstructure:"gatewaysvc"` + IndexPool string `mapstructure:"index_pool"` + Root string `mapstructure:"root"` + ShadowFolder string `mapstructure:"shadow_folder"` + ShareFolder string `mapstructure:"share_folder"` + UploadFolder string `mapstructure:"uploads"` + UserLayout string `mapstructure:"user_layout"` + + DisableHome bool `mapstructure:"disable_home"` + UserQuotaBytes uint64 `mapstructure:"user_quota_bytes"` + HiddenDirs map[string]bool +} + +func (c *Options) fillDefaults() { + c.GatewaySvc = sharedconf.GetGatewaySVC(c.GatewaySvc) + + if c.IndexPool == "" { + c.IndexPool = "path_index" + } + + if c.Root == "" { + c.Root = "/home" + } else { + c.Root = addLeadingSlash(c.Root) //force absolute path in case leading "/" is omitted + } + + if c.ShadowFolder == "" { + c.ShadowFolder = "/.reva_hidden" + } else { + c.ShadowFolder = addLeadingSlash(c.ShadowFolder) + } + + if c.ShareFolder == "" { + c.ShareFolder = "/Shares" + } else { + c.ShareFolder = addLeadingSlash(c.ShareFolder) + } + + if c.UploadFolder == "" { + c.UploadFolder = ".uploads" + } + c.UploadFolder = filepath.Join(c.ShadowFolder, c.UploadFolder) + + if c.UserLayout == "" { + c.UserLayout = "{{.Username}}" + } + + c.HiddenDirs = map[string]bool{ + ".": true, + "..": true, + removeLeadingSlash(c.ShadowFolder): true, + } + + c.DisableHome = false // it is currently only home based + + if c.UserQuotaBytes == 0 { + c.UserQuotaBytes = 50000000000 + } +} diff --git a/pkg/storage/fs/cephfs/permissions.go b/pkg/storage/fs/cephfs/permissions.go new file mode 100644 index 00000000000..8b4a32484de --- /dev/null +++ b/pkg/storage/fs/cephfs/permissions.go @@ -0,0 +1,325 @@ +// Copyright 2018-2021 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +//go:build ceph +// +build ceph + +package cephfs + +import ( + "context" + "errors" + "fmt" + "strings" + + userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" + + cephfs2 "github.com/ceph/go-ceph/cephfs" + grouppb "github.com/cs3org/go-cs3apis/cs3/identity/group/v1beta1" + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + "github.com/maxymania/go-system/posix_acl" +) + +var perms = map[rune][]string{ + 'r': { + "Stat", + "GetPath", + "GetQuota", + "InitiateFileDownload", + "ListGrants", + }, + 'w': { + "AddGrant", + "CreateContainer", + "Delete", + "InitiateFileUpload", + "Move", + "RemoveGrant", + "PurgeRecycle", + "RestoreFileVersion", + "RestoreRecycleItem", + "UpdateGrant", + }, + 'x': { + "ListRecycle", + "ListContainer", + "ListFileVersions", + }, +} + +const ( + aclXattr = "system.posix_acl_access" +) + +var op2int = map[rune]uint16{'r': 4, 'w': 2, 'x': 1} + +func getPermissionSet(user *User, stat *cephfs2.CephStatx, mount Mount, path string) (perm *provider.ResourcePermissions) { + perm = &provider.ResourcePermissions{} + + if int64(stat.Uid) == user.UidNumber || int64(stat.Gid) == user.GidNumber { + updatePerms(perm, "rwx", false) + return + } + + acls := &posix_acl.Acl{} + var xattr []byte + var err error + if xattr, err = mount.GetXattr(path, aclXattr); err != nil { + return nil + } + acls.Decode(xattr) + + group, err := user.fs.getGroupByID(user.ctx, fmt.Sprint(stat.Gid)) + + for _, acl := range acls.List { + rwx := strings.Split(acl.String(), ":")[2] + switch acl.GetType() { + case posix_acl.ACL_USER: + if int64(acl.GetID()) == user.UidNumber { + updatePerms(perm, rwx, false) + } + case posix_acl.ACL_GROUP: + if int64(acl.GetID()) == user.GidNumber || in(group.GroupName, user.Groups) { + updatePerms(perm, rwx, false) + } + case posix_acl.ACL_MASK: + updatePerms(perm, rwx, true) + case posix_acl.ACL_OTHERS: + updatePerms(perm, rwx, false) + } + } + + return +} + +func (fs *cephfs) getFullPermissionSet(ctx context.Context, mount Mount, path string) (permList []*provider.Grant) { + acls := &posix_acl.Acl{} + var xattr []byte + var err error + if xattr, err = mount.GetXattr(path, aclXattr); err != nil { + return nil + } + acls.Decode(xattr) + + for _, acl := range acls.List { + rwx := strings.Split(acl.String(), ":")[2] + switch acl.GetType() { + case posix_acl.ACL_USER: + user, err := fs.getUserByID(ctx, fmt.Sprint(acl.GetID())) + if err != nil { + return nil + } + userGrant := &provider.Grant{ + Grantee: &provider.Grantee{ + Type: provider.GranteeType_GRANTEE_TYPE_USER, + Id: &provider.Grantee_UserId{UserId: user.Id}, + }, + Permissions: &provider.ResourcePermissions{}, + } + updatePerms(userGrant.Permissions, rwx, false) + permList = append(permList, userGrant) + case posix_acl.ACL_GROUP: + group, err := fs.getGroupByID(ctx, fmt.Sprint(acl.GetID())) + if err != nil { + return nil + } + groupGrant := &provider.Grant{ + Grantee: &provider.Grantee{ + Type: provider.GranteeType_GRANTEE_TYPE_GROUP, + Id: &provider.Grantee_GroupId{GroupId: group.Id}, + }, + Permissions: &provider.ResourcePermissions{}, + } + updatePerms(groupGrant.Permissions, rwx, false) + permList = append(permList, groupGrant) + } + } + + return +} + +/* +func permToIntRefl(p *provider.ResourcePermissions) (result uint16) { + if p == nil { return 0b111 } //rwx + + item := reflect.ValueOf(p).Elem() + for _, op := range "rwx" { + for _, perm := range perms[op] { + if item.FieldByName(perm).Bool() { + result |= op2int[op] + break //if value is 1 then bitwise OR can never change it again + } + } + } + + return +} +*/ + +func permToInt(rp *provider.ResourcePermissions) (result uint16) { + if rp == nil { + return 0b111 // rwx + } + if rp.Stat || rp.GetPath || rp.GetQuota || rp.ListGrants || rp.InitiateFileDownload { + result |= 4 + } + if rp.CreateContainer || rp.Move || rp.Delete || rp.InitiateFileUpload || rp.AddGrant || rp.UpdateGrant || + rp.RemoveGrant || rp.DenyGrant || rp.RestoreFileVersion || rp.PurgeRecycle || rp.RestoreRecycleItem { + result |= 2 + } + if rp.ListRecycle || rp.ListContainer || rp.ListFileVersions { + result |= 1 + } + + return +} + +const ( + updateGrant = iota + removeGrant = iota +) + +func (fs *cephfs) changePerms(ctx context.Context, mt Mount, grant *provider.Grant, path string, method int) (err error) { + buf, err := mt.GetXattr(path, aclXattr) + if err != nil { + return + } + acls := &posix_acl.Acl{} + acls.Decode(buf) + var sid posix_acl.AclSID + + switch grant.Grantee.Type { + case provider.GranteeType_GRANTEE_TYPE_USER: + var user *userpb.User + if user, err = fs.getUserByOpaqueID(ctx, grant.Grantee.GetUserId().OpaqueId); err != nil { + return + } + sid.SetUid(uint32(user.UidNumber)) + case provider.GranteeType_GRANTEE_TYPE_GROUP: + var group *grouppb.Group + if group, err = fs.getGroupByOpaqueID(ctx, grant.Grantee.GetGroupId().OpaqueId); err != nil { + return + } + sid.SetGid(uint32(group.GidNumber)) + default: + return errors.New("cephfs: invalid grantee type") + } + + var found = false + var i int + for i = range acls.List { + if acls.List[i].AclSID == sid { + found = true + } + } + + if method == updateGrant { + if found { + acls.List[i].Perm |= permToInt(grant.Permissions) + if acls.List[i].Perm == 0 { // remove empty grant + acls.List = append(acls.List[:i], acls.List[i+1:]...) + } + } else { + acls.List = append(acls.List, posix_acl.AclElement{ + AclSID: sid, + Perm: permToInt(grant.Permissions), + }) + } + } else { //removeGrant + if found { + acls.List[i].Perm &^= permToInt(grant.Permissions) //bitwise and-not, to clear bits on Perm + if acls.List[i].Perm == 0 { // remove empty grant + acls.List = append(acls.List[:i], acls.List[i+1:]...) + } + } + } + + err = mt.SetXattr(path, aclXattr, acls.Encode(), 0) + + return +} + +/* +func updatePermsRefl(rp *provider.ResourcePermissions, acl string, unset bool) { + if rp == nil { return } + for _, t := range "rwx" { + if strings.ContainsRune(acl, t) { + for _, i := range perms[t] { + reflect.ValueOf(rp).Elem().FieldByName(i).SetBool(true) + } + } else if unset { + for _, i := range perms[t] { + reflect.ValueOf(rp).Elem().FieldByName(i).SetBool(false) + } + } + } +} +*/ + +func updatePerms(rp *provider.ResourcePermissions, acl string, unset bool) { + if rp == nil { + return + } + if strings.ContainsRune(acl, 'r') { + rp.Stat = true + rp.GetPath = true + rp.GetQuota = true + rp.InitiateFileDownload = true + rp.ListGrants = true + } else if unset { + rp.Stat = false + rp.GetPath = false + rp.GetQuota = false + rp.InitiateFileDownload = false + rp.ListGrants = false + } + if strings.ContainsRune(acl, 'w') { + rp.AddGrant = true + rp.DenyGrant = true + rp.CreateContainer = true + rp.Delete = true + rp.InitiateFileUpload = true + rp.Move = true + rp.RemoveGrant = true + rp.PurgeRecycle = true + rp.RestoreFileVersion = true + rp.RestoreRecycleItem = true + rp.UpdateGrant = true + } else if unset { + rp.AddGrant = false + rp.DenyGrant = false + rp.CreateContainer = false + rp.Delete = false + rp.InitiateFileUpload = false + rp.Move = false + rp.RemoveGrant = false + rp.PurgeRecycle = false + rp.RestoreFileVersion = false + rp.RestoreRecycleItem = false + rp.UpdateGrant = false + } + if strings.ContainsRune(acl, 'x') { + rp.ListRecycle = true + rp.ListContainer = true + rp.ListFileVersions = true + } else if unset { + rp.ListRecycle = false + rp.ListContainer = false + rp.ListFileVersions = false + } +} diff --git a/pkg/storage/fs/cephfs/unsupported.go b/pkg/storage/fs/cephfs/unsupported.go new file mode 100644 index 00000000000..a337f3f7895 --- /dev/null +++ b/pkg/storage/fs/cephfs/unsupported.go @@ -0,0 +1,39 @@ +// Copyright 2018-2021 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +//go:build !ceph +// +build !ceph + +package cephfs + +import ( + "github.com/pkg/errors" + + "github.com/cs3org/reva/pkg/storage" + "github.com/cs3org/reva/pkg/storage/fs/registry" +) + +func init() { + registry.Register("cephfs", New) +} + +// New returns an implementation to of the storage.FS interface that talk to +// a ceph filesystem. +func New(m map[string]interface{}) (storage.FS, error) { + return nil, errors.New("cephfs: revad was compiled without CephFS support") +} diff --git a/pkg/storage/fs/cephfs/upload.go b/pkg/storage/fs/cephfs/upload.go new file mode 100644 index 00000000000..2d8eccf7c97 --- /dev/null +++ b/pkg/storage/fs/cephfs/upload.go @@ -0,0 +1,413 @@ +// Copyright 2018-2021 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +//go:build ceph +// +build ceph + +package cephfs + +import ( + "bytes" + "context" + "encoding/json" + "io" + "os" + "path/filepath" + + cephfs2 "github.com/ceph/go-ceph/cephfs" + userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + "github.com/cs3org/reva/pkg/appctx" + ctx2 "github.com/cs3org/reva/pkg/ctx" + "github.com/cs3org/reva/pkg/errtypes" + "github.com/cs3org/reva/pkg/utils" + "github.com/google/uuid" + "github.com/pkg/errors" + tusd "github.com/tus/tusd/pkg/handler" +) + +func (fs *cephfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser) error { + user := fs.makeUser(ctx) + upload, err := fs.GetUpload(ctx, ref.GetPath()) + if err != nil { + metadata := map[string]string{"sizedeferred": "true"} + uploadIDs, err := fs.InitiateUpload(ctx, ref, 0, metadata) + if err != nil { + return err + } + if upload, err = fs.GetUpload(ctx, uploadIDs["simple"]); err != nil { + return errors.Wrap(err, "cephfs: error retrieving upload") + } + } + + uploadInfo := upload.(*fileUpload) + + p := uploadInfo.info.Storage["InternalDestination"] + ok, err := IsChunked(p) + if err != nil { + return errors.Wrap(err, "cephfs: error checking path") + } + if ok { + var assembledFile string + p, assembledFile, err = NewChunkHandler(ctx, fs).WriteChunk(p, r) + if err != nil { + return err + } + if p == "" { + if err = uploadInfo.Terminate(ctx); err != nil { + return errors.Wrap(err, "cephfs: error removing auxiliary files") + } + return errtypes.PartialContent(ref.String()) + } + uploadInfo.info.Storage["InternalDestination"] = p + + user.op(func(cv *cacheVal) { + r, err = cv.mount.Open(assembledFile, os.O_RDONLY, 0) + }) + if err != nil { + return errors.Wrap(err, "cephfs: error opening assembled file") + } + defer r.Close() + defer user.op(func(cv *cacheVal) { + _ = cv.mount.Unlink(assembledFile) + }) + } + + if _, err := uploadInfo.WriteChunk(ctx, 0, r); err != nil { + return errors.Wrap(err, "cephfs: error writing to binary file") + } + + return uploadInfo.FinishUpload(ctx) +} + +func (fs *cephfs) InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64, metadata map[string]string) (map[string]string, error) { + user := fs.makeUser(ctx) + np, err := user.resolveRef(ref) + if err != nil { + return nil, errors.Wrap(err, "cephfs: error resolving reference") + } + + info := tusd.FileInfo{ + MetaData: tusd.MetaData{ + "filename": filepath.Base(np), + "dir": filepath.Dir(np), + }, + Size: uploadLength, + } + + if metadata != nil { + if metadata["mtime"] != "" { + info.MetaData["mtime"] = metadata["mtime"] + } + if _, ok := metadata["sizedeferred"]; ok { + info.SizeIsDeferred = true + } + } + + upload, err := fs.NewUpload(ctx, info) + if err != nil { + return nil, err + } + + info, _ = upload.GetInfo(ctx) + + return map[string]string{ + "simple": info.ID, + "tus": info.ID, + }, nil +} + +// UseIn tells the tus upload middleware which extensions it supports. +func (fs *cephfs) UseIn(composer *tusd.StoreComposer) { + composer.UseCore(fs) + composer.UseTerminater(fs) +} + +func (fs *cephfs) NewUpload(ctx context.Context, info tusd.FileInfo) (upload tusd.Upload, err error) { + log := appctx.GetLogger(ctx) + log.Debug().Interface("info", info).Msg("cephfs: NewUpload") + + user := fs.makeUser(ctx) + + fn := info.MetaData["filename"] + if fn == "" { + return nil, errors.New("cephfs: missing filename in metadata") + } + info.MetaData["filename"] = filepath.Clean(info.MetaData["filename"]) + + dir := info.MetaData["dir"] + if dir == "" { + return nil, errors.New("cephfs: missing dir in metadata") + } + info.MetaData["dir"] = filepath.Clean(info.MetaData["dir"]) + + np := filepath.Join(info.MetaData["dir"], info.MetaData["filename"]) + + info.ID = uuid.New().String() + + binPath := fs.getUploadPath(info.ID) + + info.Storage = map[string]string{ + "Type": "Cephfs", + "BinPath": binPath, + "InternalDestination": np, + + "Idp": user.Id.Idp, + "UserId": user.Id.OpaqueId, + "UserName": user.Username, + "UserType": utils.UserTypeToString(user.Id.Type), + + "LogLevel": log.GetLevel().String(), + } + + // Create binary file with no content + user.op(func(cv *cacheVal) { + var f *cephfs2.File + defer closeFile(f) + f, err = cv.mount.Open(binPath, os.O_CREATE|os.O_WRONLY, filePermDefault) + if err != nil { + return + } + }) + //TODO: if we get two same upload ids, the second one can't upload at all + if err != nil { + return + } + + upload = &fileUpload{ + info: info, + binPath: binPath, + infoPath: binPath + ".info", + fs: fs, + ctx: ctx, + } + + if !info.SizeIsDeferred && info.Size == 0 { + log.Debug().Interface("info", info).Msg("cephfs: finishing upload for empty file") + // no need to create info file and finish directly + err = upload.FinishUpload(ctx) + + return + } + + // writeInfo creates the file by itself if necessary + err = upload.(*fileUpload).writeInfo() + + return +} + +func (fs *cephfs) getUploadPath(uploadID string) string { + return filepath.Join(fs.conf.UploadFolder, uploadID) +} + +// GetUpload returns the Upload for the given upload id +func (fs *cephfs) GetUpload(ctx context.Context, id string) (fup tusd.Upload, err error) { + binPath := fs.getUploadPath(id) + info := tusd.FileInfo{} + if err != nil { + return nil, errtypes.NotFound("bin path for upload " + id + " not found") + } + infoPath := binPath + ".info" + + var data bytes.Buffer + f, err := fs.adminConn.adminMount.Open(infoPath, os.O_RDONLY, 0) + if err != nil { + return + } + _, err = io.Copy(&data, f) + if err != nil { + return + } + if err = json.Unmarshal(data.Bytes(), &info); err != nil { + return + } + + u := &userpb.User{ + Id: &userpb.UserId{ + Idp: info.Storage["Idp"], + OpaqueId: info.Storage["UserId"], + }, + Username: info.Storage["UserName"], + } + ctx = ctx2.ContextSetUser(ctx, u) + user := fs.makeUser(ctx) + + var stat Statx + user.op(func(cv *cacheVal) { + stat, err = cv.mount.Statx(binPath, cephfs2.StatxSize, 0) + }) + if err != nil { + return + } + info.Offset = int64(stat.Size) + + return &fileUpload{ + info: info, + binPath: binPath, + infoPath: infoPath, + fs: fs, + ctx: ctx, + }, nil +} + +type fileUpload struct { + // info stores the current information about the upload + info tusd.FileInfo + // infoPath is the path to the .info file + infoPath string + // binPath is the path to the binary file (which has no extension) + binPath string + // only fs knows how to handle metadata and versions + fs *cephfs + // a context with a user + ctx context.Context +} + +// GetInfo returns the FileInfo +func (upload *fileUpload) GetInfo(ctx context.Context) (tusd.FileInfo, error) { + return upload.info, nil +} + +// GetReader returns an io.Reader for the upload +func (upload *fileUpload) GetReader(ctx context.Context) (file io.Reader, err error) { + user := upload.fs.makeUser(upload.ctx) + user.op(func(cv *cacheVal) { + file, err = cv.mount.Open(upload.binPath, os.O_RDONLY, 0) + }) + return +} + +// WriteChunk writes the stream from the reader to the given offset of the upload +func (upload *fileUpload) WriteChunk(ctx context.Context, offset int64, src io.Reader) (n int64, err error) { + var file io.WriteCloser + user := upload.fs.makeUser(upload.ctx) + user.op(func(cv *cacheVal) { + file, err = cv.mount.Open(upload.binPath, os.O_WRONLY|os.O_APPEND, 0) + }) + if err != nil { + return 0, err + } + defer file.Close() + + n, err = io.Copy(file, src) + + // If the HTTP PATCH request gets interrupted in the middle (e.g. because + // the user wants to pause the upload), Go's net/http returns an io.ErrUnexpectedEOF. + // However, for OwnCloudStore it's not important whether the stream has ended + // on purpose or accidentally. + if err != nil { + if err != io.ErrUnexpectedEOF { + return n, err + } + } + + upload.info.Offset += n + err = upload.writeInfo() + + return n, err +} + +// writeInfo updates the entire information. Everything will be overwritten. +func (upload *fileUpload) writeInfo() error { + data, err := json.Marshal(upload.info) + + if err != nil { + return err + } + user := upload.fs.makeUser(upload.ctx) + user.op(func(cv *cacheVal) { + var file io.WriteCloser + if file, err = cv.mount.Open(upload.infoPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, filePermDefault); err != nil { + return + } + defer file.Close() + + _, err = io.Copy(file, bytes.NewReader(data)) + }) + + return err +} + +// FinishUpload finishes an upload and moves the file to the internal destination +func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { + + np := upload.info.Storage["InternalDestination"] + + // TODO check etag with If-Match header + // if destination exists + // if _, err := os.Stat(np); err == nil { + // the local storage does not store metadata + // the fileid is based on the path, so no we do not need to copy it to the new file + // the local storage does not track revisions + // } + + // if destination exists + // if _, err := os.Stat(np); err == nil { + // create revision + // if err := upload.fs.archiveRevision(upload.ctx, np); err != nil { + // return err + // } + // } + + user := upload.fs.makeUser(upload.ctx) + log := appctx.GetLogger(ctx) + + user.op(func(cv *cacheVal) { + err = cv.mount.Rename(upload.binPath, np) + }) + if err != nil { + return errors.Wrap(err, upload.binPath) + } + + // only delete the upload if it was successfully written to the fs + user.op(func(cv *cacheVal) { + err = cv.mount.Unlink(upload.infoPath) + }) + if err != nil { + if err.Error() != errNotFound { + log.Err(err).Interface("info", upload.info).Msg("cephfs: could not delete upload metadata") + } + } + + // TODO: set mtime if specified in metadata + + return +} + +// To implement the termination extension as specified in https://tus.io/protocols/resumable-upload.html#termination +// - the storage needs to implement AsTerminatableUpload +// - the upload needs to implement Terminate + +// AsTerminatableUpload returns a a TerminatableUpload +func (fs *cephfs) AsTerminatableUpload(upload tusd.Upload) tusd.TerminatableUpload { + return upload.(*fileUpload) +} + +// Terminate terminates the upload +func (upload *fileUpload) Terminate(ctx context.Context) (err error) { + user := upload.fs.makeUser(upload.ctx) + + user.op(func(cv *cacheVal) { + if err = cv.mount.Unlink(upload.infoPath); err != nil { + return + } + err = cv.mount.Unlink(upload.binPath) + }) + + return +} diff --git a/pkg/storage/fs/cephfs/user.go b/pkg/storage/fs/cephfs/user.go new file mode 100644 index 00000000000..e99686adfd7 --- /dev/null +++ b/pkg/storage/fs/cephfs/user.go @@ -0,0 +1,242 @@ +// Copyright 2018-2021 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +//go:build ceph +// +build ceph + +package cephfs + +import ( + "context" + "fmt" + "path/filepath" + "strconv" + "strings" + "syscall" + + "github.com/cs3org/reva/pkg/errtypes" + + cephfs2 "github.com/ceph/go-ceph/cephfs" + userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + typesv1beta1 "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" + ctx2 "github.com/cs3org/reva/pkg/ctx" + "github.com/cs3org/reva/pkg/mime" + "github.com/cs3org/reva/pkg/storage/utils/templates" + "github.com/pkg/errors" +) + +type callBack func(cb *cacheVal) + +// User custom type to add functionality to current struct +type User struct { + *userv1beta1.User + fs *cephfs + ctx context.Context + home string +} + +func (fs *cephfs) makeUser(ctx context.Context) *User { + u := ctx2.ContextMustGetUser(ctx) + home := filepath.Join(fs.conf.Root, templates.WithUser(u, fs.conf.UserLayout)) + return &User{u, fs, ctx, home} +} + +func (user *User) absPath(path string) string { + //shares will always be absolute to avoid prepending the user path to the path of the file's owner + if !filepath.IsAbs(path) { + path = filepath.Join(user.home, path) + } + + return path +} + +func (user *User) op(cb callBack) { + conn := user.fs.conn + if err := conn.lock.Acquire(conn.ctx, 1); err != nil { + return + } + defer conn.lock.Release(1) + + val, found := conn.cache.Get(user.Id.OpaqueId) + if !found { + cvalue := newConn(user) + if cvalue != nil { + conn.cache.Set(user.Id.OpaqueId, cvalue, 1) + } else { + return + } + cb(cvalue) + return + } + + cb(val.(*cacheVal)) +} + +func (user *User) fileAsResourceInfo(cv *cacheVal, path string, stat *cephfs2.CephStatx, mdKeys []string) (ri *provider.ResourceInfo, err error) { + var ( + _type provider.ResourceType + target string + size uint64 + buf []byte + ) + + switch int(stat.Mode) & syscall.S_IFMT { + case syscall.S_IFDIR: + _type = provider.ResourceType_RESOURCE_TYPE_CONTAINER + if buf, err = cv.mount.GetXattr(path, "ceph.dir.rbytes"); err == nil { + size, err = strconv.ParseUint(string(buf), 10, 64) + } + case syscall.S_IFLNK: + _type = provider.ResourceType_RESOURCE_TYPE_SYMLINK + target, err = cv.mount.Readlink(path) + case syscall.S_IFREG: + _type = provider.ResourceType_RESOURCE_TYPE_FILE + size = stat.Size + default: + return nil, errors.New("cephfs: unknown entry type") + } + + if err != nil { + return + } + + var xattrs []string + keys := make(map[string]bool, len(mdKeys)) + for _, key := range mdKeys { + keys[key] = true + } + if keys["*"] || len(keys) == 0 { + mdKeys = []string{} + keys = map[string]bool{} + } + mx := make(map[string]string) + if xattrs, err = cv.mount.ListXattr(path); err == nil { + for _, xattr := range xattrs { + if len(mdKeys) == 0 || keys[xattr] { + if buf, err := cv.mount.GetXattr(path, xattr); err == nil { + mx[xattr] = string(buf) + } + } + } + } + + //TODO(tmourati): Add entry id logic here + + var etag string + if isDir(_type) { + rctime, _ := cv.mount.GetXattr(path, "ceph.dir.rctime") + etag = fmt.Sprint(stat.Inode) + ":" + string(rctime) + } else { + etag = fmt.Sprint(stat.Inode) + ":" + strconv.FormatInt(stat.Ctime.Sec, 10) + } + + mtime := &typesv1beta1.Timestamp{ + Seconds: uint64(stat.Mtime.Sec), + Nanos: uint32(stat.Mtime.Nsec), + } + + perms := getPermissionSet(user, stat, cv.mount, path) + + for key := range mx { + if !strings.HasPrefix(key, xattrUserNs) { + delete(mx, key) + } + } + + var checksum provider.ResourceChecksum + var md5 string + if _type == provider.ResourceType_RESOURCE_TYPE_FILE { + md5tsBA, err := cv.mount.GetXattr(path, xattrMd5ts) //local error inside if scope + if err == nil { + md5ts, _ := strconv.ParseInt(string(md5tsBA), 10, 64) + if stat.Mtime.Sec == md5ts { + md5BA, err := cv.mount.GetXattr(path, xattrMd5) + if err != nil { + md5, err = calcChecksum(path, cv.mount, stat) + } else { + md5 = string(md5BA) + } + } else { + md5, err = calcChecksum(path, cv.mount, stat) + } + } else { + md5, err = calcChecksum(path, cv.mount, stat) + } + + if err != nil && err.Error() == errPermissionDenied { + checksum.Type = provider.ResourceChecksumType_RESOURCE_CHECKSUM_TYPE_UNSET + } else if err != nil { + return nil, errors.New("cephfs: error calculating checksum of file") + } else { + checksum.Type = provider.ResourceChecksumType_RESOURCE_CHECKSUM_TYPE_MD5 + checksum.Sum = md5 + } + } else { + checksum.Type = provider.ResourceChecksumType_RESOURCE_CHECKSUM_TYPE_UNSET + } + + var ownerID *userv1beta1.UserId + if stat.Uid != 0 { + var owner *userv1beta1.User + if int64(stat.Uid) != user.UidNumber { + owner, err = user.fs.getUserByID(user.ctx, fmt.Sprint(stat.Uid)) + } else { + owner = user.User + } + + if owner == nil { + return nil, errors.New("cephfs: error getting owner of entry: " + path) + } + + ownerID = owner.Id + } else { + ownerID = &userv1beta1.UserId{OpaqueId: "root"} + } + + ri = &provider.ResourceInfo{ + Type: _type, + Id: &provider.ResourceId{OpaqueId: fmt.Sprint(stat.Inode)}, + Checksum: &checksum, + Etag: etag, + MimeType: mime.Detect(isDir(_type), path), + Mtime: mtime, + Path: path, + PermissionSet: perms, + Size: size, + Owner: ownerID, + Target: target, + ArbitraryMetadata: &provider.ArbitraryMetadata{Metadata: mx}, + } + + return +} + +func (user *User) resolveRef(ref *provider.Reference) (str string, err error) { + if ref == nil { + return "", fmt.Errorf("cephfs: nil reference") + } + + if str = ref.GetPath(); str == "" { + return "", errtypes.NotSupported("cephfs: entry IDs not currently supported") + } + + str = removeLeadingSlash(str) //path must be relative + + return +} diff --git a/pkg/storage/fs/cephfs/utils.go b/pkg/storage/fs/cephfs/utils.go new file mode 100644 index 00000000000..e25b3bc5fa8 --- /dev/null +++ b/pkg/storage/fs/cephfs/utils.go @@ -0,0 +1,245 @@ +// Copyright 2018-2021 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +//go:build ceph +// +build ceph + +package cephfs + +import ( + "crypto/md5" + "encoding/hex" + "fmt" + "io" + "os" + "path/filepath" + "strconv" + "strings" + + cephfs2 "github.com/ceph/go-ceph/cephfs" + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" +) + +// Mount type +type Mount = *cephfs2.MountInfo + +// Statx type +type Statx = *cephfs2.CephStatx + +var dirPermFull = uint32(0777) +var dirPermDefault = uint32(0775) +var filePermDefault = uint32(0660) + +func closeDir(directory *cephfs2.Directory) { + if directory != nil { + _ = directory.Close() + } +} + +func closeFile(file *cephfs2.File) { + if file != nil { + _ = file.Close() + } +} + +func destroyCephConn(mt Mount, perm *cephfs2.UserPerm) *cacheVal { + if perm != nil { + perm.Destroy() + } + if mt != nil { + _ = mt.Release() + } + return nil +} + +func deleteFile(mount *cephfs2.MountInfo, path string) { + _ = mount.Unlink(path) +} + +func isDir(t provider.ResourceType) bool { + return t == provider.ResourceType_RESOURCE_TYPE_CONTAINER +} + +func (fs *cephfs) makeFIDPath(fid string) string { + return "" //filepath.Join(fs.conf.EIDFolder, fid) +} + +func (fs *cephfs) makeFID(absolutePath string, inode string) (rid *provider.ResourceId, err error) { + sum := md5.New() + sum.Write([]byte(absolutePath)) + fid := fmt.Sprintf("%s-%s", hex.EncodeToString(sum.Sum(nil)), inode) + rid = &provider.ResourceId{OpaqueId: fid} + + _ = fs.adminConn.adminMount.Link(absolutePath, fs.makeFIDPath(fid)) + _ = fs.adminConn.adminMount.SetXattr(absolutePath, xattrEID, []byte(fid), 0) + + return +} + +func (fs *cephfs) getFIDPath(cv *cacheVal, path string) (fid string, err error) { + var buffer []byte + if buffer, err = cv.mount.GetXattr(path, xattrEID); err != nil { + return + } + + return fs.makeFIDPath(string(buffer)), err +} + +func calcChecksum(filepath string, mt Mount, stat Statx) (checksum string, err error) { + file, err := mt.Open(filepath, os.O_RDONLY, 0) + defer closeFile(file) + if err != nil { + return + } + hash := md5.New() + if _, err = io.Copy(hash, file); err != nil { + return + } + checksum = hex.EncodeToString(hash.Sum(nil)) + // we don't care if they fail, the checksum will just be recalculated if an error happens + _ = mt.SetXattr(filepath, xattrMd5ts, []byte(strconv.FormatInt(stat.Mtime.Sec, 10)), 0) + _ = mt.SetXattr(filepath, xattrMd5, []byte(checksum), 0) + + return +} + +func resolveRevRef(mt Mount, ref *provider.Reference, revKey string) (str string, err error) { + var buf []byte + if ref.GetResourceId() != nil { + str, err = mt.Readlink(filepath.Join(snap, revKey, ref.ResourceId.OpaqueId)) + if err != nil { + return "", fmt.Errorf("cephfs: invalid reference %+v", ref) + } + } else if str = ref.GetPath(); str != "" { + buf, err = mt.GetXattr(str, xattrEID) + if err != nil { + return + } + str, err = mt.Readlink(filepath.Join(snap, revKey, string(buf))) + if err != nil { + return + } + } else { + return "", fmt.Errorf("cephfs: empty reference %+v", ref) + } + + return filepath.Join(snap, revKey, str), err +} + +func removeLeadingSlash(path string) string { + return filepath.Join(".", path) +} + +func addLeadingSlash(path string) string { + return filepath.Join("/", path) +} + +func in(lookup string, list []string) bool { + for _, item := range list { + if item == lookup { + return true + } + } + return false +} + +func pathGenerator(path string, reverse bool, str chan string) { + if reverse { + str <- path + for i := range path { + if path[len(path)-i-1] == filepath.Separator { + str <- path[:len(path)-i-1] + } + } + } else { + for i := range path { + if path[i] == filepath.Separator { + str <- path[:i] + } + } + str <- path + } + + close(str) +} + +func walkPath(path string, f func(string) error, reverse bool) (err error) { + paths := make(chan string) + go pathGenerator(path, reverse, paths) + for path := range paths { + if path == "" { + continue + } + if err = f(path); err != nil && err.Error() != errFileExists && err.Error() != errNotFound { + break + } else { + err = nil + } + } + + return +} + +func (fs *cephfs) writeIndex(oid string, value string) (err error) { + return fs.adminConn.radosIO.WriteFull(oid, []byte(value)) +} + +func (fs *cephfs) removeIndex(oid string) error { + return fs.adminConn.radosIO.Delete(oid) +} + +func (fs *cephfs) resolveIndex(oid string) (fullPath string, err error) { + var i int + var currPath strings.Builder + root := string(filepath.Separator) + offset := uint64(0) + io := fs.adminConn.radosIO + bsize := 4096 + buffer := make([]byte, bsize) + for { + for { //read object + i, err = io.Read(oid, buffer, offset) + offset += uint64(bsize) + currPath.Write(buffer) + if err == nil && i >= bsize { + buffer = buffer[:0] + continue + } else { + offset = 0 + break + } + } + if err != nil { + return + } + + ss := strings.SplitN(currPath.String(), string(filepath.Separator), 2) + if len(ss) != 2 { + if currPath.String() == root { + return + } + + return "", fmt.Errorf("cephfs: entry id is not in the form of \"parentID/entryname\"") + } + parentOID := ss[0] + entryName := ss[1] + fullPath = filepath.Join(entryName, fullPath) + oid = parentOID + currPath.Reset() + } +} diff --git a/pkg/storage/fs/loader/loader.go b/pkg/storage/fs/loader/loader.go index f9ff86af040..cd88c5ddc79 100644 --- a/pkg/storage/fs/loader/loader.go +++ b/pkg/storage/fs/loader/loader.go @@ -20,6 +20,7 @@ package loader import ( // Load core storage filesystem backends. + _ "github.com/cs3org/reva/pkg/storage/fs/cephfs" _ "github.com/cs3org/reva/pkg/storage/fs/eos" _ "github.com/cs3org/reva/pkg/storage/fs/eosgrpc" _ "github.com/cs3org/reva/pkg/storage/fs/eosgrpchome"