From f6788694d1639ef31c888c59c436e632d0261302 Mon Sep 17 00:00:00 2001 From: Giuseppe Lo Presti Date: Wed, 6 Nov 2024 16:02:08 +0100 Subject: [PATCH] cephfs refactoring + make home layout configurable (#4911) --- changelog/unreleased/ceph-refactoring.md | 3 + changelog/unreleased/enhance-ceph.md | 3 + go.mod | 1 + internal/grpc/services/gateway/gateway.go | 6 + .../grpc/services/gateway/storageprovider.go | 1182 +---------------- .../storageprovider/storageprovider.go | 18 +- .../http/services/datagateway/datagateway.go | 1 - .../http/services/owncloud/ocdav/ocdav.go | 19 +- .../http/services/owncloud/ocdav/propfind.go | 1 + .../owncloud/ocs/data/capabilities.go | 2 +- .../handlers/cloud/capabilities/uploads.go | 59 +- pkg/auth/manager/ldap/ldap.go | 10 +- pkg/storage/fs/cephfs/cephfs.go | 315 ++--- pkg/storage/fs/cephfs/chunking.go | 73 +- pkg/storage/fs/cephfs/connections.go | 28 +- pkg/storage/fs/cephfs/errors.go | 7 +- pkg/storage/fs/cephfs/options.go | 39 +- pkg/storage/fs/cephfs/upload.go | 70 +- pkg/storage/fs/cephfs/user.go | 53 +- pkg/storage/fs/cephfs/utils.go | 128 +- pkg/user/manager/ldap/ldap.go | 3 + 21 files changed, 358 insertions(+), 1663 deletions(-) create mode 100644 changelog/unreleased/ceph-refactoring.md create mode 100644 changelog/unreleased/enhance-ceph.md diff --git a/changelog/unreleased/ceph-refactoring.md b/changelog/unreleased/ceph-refactoring.md new file mode 100644 index 0000000000..94f1823977 --- /dev/null +++ b/changelog/unreleased/ceph-refactoring.md @@ -0,0 +1,3 @@ +Enhancement: cephfs refactoring + make home layout configurable + +https://github.com/cs3org/reva/pull/4911 diff --git a/changelog/unreleased/enhance-ceph.md b/changelog/unreleased/enhance-ceph.md new file mode 100644 index 0000000000..b4fc855ff7 --- /dev/null +++ b/changelog/unreleased/enhance-ceph.md @@ -0,0 +1,3 @@ +Enhancement: Refactor Ceph code + +https://github.com/cs3org/reva/pull/4824 diff --git a/go.mod b/go.mod index 43860145f6..f0e42d115a 100644 --- a/go.mod +++ b/go.mod @@ -74,6 +74,7 @@ require ( github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bmizerany/pat v0.0.0-20210406213842-e4b6760bdd6f // indirect + github.com/cern-eos/go-eosgrpc v0.0.0-20240812132646-f105d2304f38 // indirect github.com/cespare/xxhash v1.1.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect diff --git a/internal/grpc/services/gateway/gateway.go b/internal/grpc/services/gateway/gateway.go index 20bea533e9..ebaf168985 100644 --- a/internal/grpc/services/gateway/gateway.go +++ b/internal/grpc/services/gateway/gateway.go @@ -71,6 +71,7 @@ type config struct { EtagCacheTTL int `mapstructure:"etag_cache_ttl"` AllowedUserAgents map[string][]string `mapstructure:"allowed_user_agents"` // map[path][]user-agent CreateHomeCacheTTL int `mapstructure:"create_home_cache_ttl"` + HomeLayout string `mapstructure:"home_layout"` } // sets defaults. @@ -111,6 +112,11 @@ func (c *config) ApplyDefaults() { if c.TransferExpires == 0 { c.TransferExpires = 100 * 60 // seconds } + + // default to /home + if c.HomeLayout == "" { + c.HomeLayout = "/home" + } } type svc struct { diff --git a/internal/grpc/services/gateway/storageprovider.go b/internal/grpc/services/gateway/storageprovider.go index a9278a13c9..3d51a9eb00 100644 --- a/internal/grpc/services/gateway/storageprovider.go +++ b/internal/grpc/services/gateway/storageprovider.go @@ -20,7 +20,6 @@ package gateway import ( "context" - "fmt" "net/url" "path" "strings" @@ -28,7 +27,6 @@ import ( gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" - collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" registry "github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1" types "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" @@ -37,14 +35,13 @@ import ( "github.com/cs3org/reva/pkg/errtypes" "github.com/cs3org/reva/pkg/rgrpc/status" "github.com/cs3org/reva/pkg/rgrpc/todo/pool" - "github.com/cs3org/reva/pkg/storage/utils/etag" + "github.com/cs3org/reva/pkg/storage/utils/templates" "github.com/cs3org/reva/pkg/utils" "github.com/golang-jwt/jwt" "github.com/google/uuid" "github.com/pkg/errors" "google.golang.org/grpc/codes" gstatus "google.golang.org/grpc/status" - "google.golang.org/protobuf/types/known/fieldmaskpb" ) // transferClaims are custom claims for a JWT token to be used between the metadata and data gateways. @@ -80,7 +77,6 @@ func (s *svc) sign(_ context.Context, target, versionKey string) (string, error) func (s *svc) CreateHome(ctx context.Context, req *provider.CreateHomeRequest) (*provider.CreateHomeResponse, error) { log := appctx.GetLogger(ctx) - home := s.getHome(ctx) c, err := s.findByPath(ctx, home) if err != nil { @@ -106,171 +102,30 @@ func (s *svc) GetHome(ctx context.Context, _ *provider.GetHomeRequest) (*provide }, nil } -func (s *svc) getHome(_ context.Context) string { - // TODO(labkode): issue #601, /home will be hardcoded. - return "/home" +func (s *svc) getHome(ctx context.Context) string { + u := appctx.ContextMustGetUser(ctx) + return templates.WithUser(u, s.c.HomeLayout) } func (s *svc) InitiateFileDownload(ctx context.Context, req *provider.InitiateFileDownloadRequest) (*gateway.InitiateFileDownloadResponse, error) { - log := appctx.GetLogger(ctx) - if utils.IsRelativeReference(req.Ref) { return s.initiateFileDownload(ctx, req) } - p, st := s.getPath(ctx, req.Ref) - if st.Code != rpc.Code_CODE_OK { - return &gateway.InitiateFileDownloadResponse{ - Status: st, - }, nil - } - - if !s.inSharedFolder(ctx, p) { - statReq := &provider.StatRequest{Ref: req.Ref} - statRes, err := s.stat(ctx, statReq) - if err != nil { - return &gateway.InitiateFileDownloadResponse{ - Status: status.NewInternal(ctx, err, "gateway: error stating ref:"+statReq.Ref.String()), - }, nil - } - if statRes.Status.Code != rpc.Code_CODE_OK { - return &gateway.InitiateFileDownloadResponse{ - Status: statRes.Status, - }, nil - } - return s.initiateFileDownload(ctx, req) - } - - if s.isSharedFolder(ctx, p) { - log.Debug().Str("path", p).Msg("path points to shared folder") - err := errtypes.PermissionDenied("gateway: cannot download share folder: path=" + p) - log.Err(err).Msg("gateway: error downloading") + statReq := &provider.StatRequest{Ref: req.Ref} + statRes, err := s.stat(ctx, statReq) + if err != nil { return &gateway.InitiateFileDownloadResponse{ - Status: status.NewInvalidArg(ctx, "path points to share folder"), + Status: status.NewInternal(ctx, err, "gateway: error stating ref:"+statReq.Ref.String()), }, nil } - - if s.isShareName(ctx, p) { - statReq := &provider.StatRequest{Ref: req.Ref} - statRes, err := s.stat(ctx, statReq) - if err != nil { - return &gateway.InitiateFileDownloadResponse{ - Status: status.NewInternal(ctx, err, "gateway: error stating ref:"+statReq.Ref.String()), - }, nil - } - if statRes.Status.Code != rpc.Code_CODE_OK { - return &gateway.InitiateFileDownloadResponse{ - Status: statRes.Status, - }, nil - } - - if statRes.Info.Type != provider.ResourceType_RESOURCE_TYPE_REFERENCE { - err := errtypes.BadRequest(fmt.Sprintf("gateway: expected reference: got:%+v", statRes.Info)) - log.Err(err).Msg("gateway: error stating share name") - return &gateway.InitiateFileDownloadResponse{ - Status: status.NewInternal(ctx, err, "gateway: error initiating download"), - }, nil - } - - ri, protocol, err := s.checkRef(ctx, statRes.Info) - if err != nil { - return &gateway.InitiateFileDownloadResponse{ - Status: status.NewStatusFromErrType(ctx, "error resolving reference "+statRes.Info.Target, err), - }, nil - } - - if protocol == "webdav" { - // TODO(ishank011): pass this through the datagateway service - // For now, we just expose the file server to the user - ep, opaque, err := s.webdavRefTransferEndpoint(ctx, statRes.Info.Target) - if err != nil { - return &gateway.InitiateFileDownloadResponse{ - Status: status.NewInternal(ctx, err, "gateway: error downloading from webdav host: "+p), - }, nil - } - return &gateway.InitiateFileDownloadResponse{ - Status: status.NewOK(ctx), - Protocols: []*gateway.FileDownloadProtocol{ - { - Opaque: opaque, - Protocol: "simple", - DownloadEndpoint: ep, - }, - }, - }, nil - } - - // if it is a file allow download - if ri.Type == provider.ResourceType_RESOURCE_TYPE_FILE { - log.Debug().Str("path", p).Interface("ri", ri).Msg("path points to share name file") - req.Ref.Path = ri.Path - log.Debug().Str("path", ri.Path).Msg("download") - return s.initiateFileDownload(ctx, req) - } - - log.Debug().Str("path", p).Interface("statRes", statRes).Msg("path:%s points to share name") - err = errtypes.PermissionDenied("gateway: cannot download share name: path=" + p) - log.Err(err).Str("path", p).Msg("gateway: error downloading") + if statRes.Status.Code != rpc.Code_CODE_OK { return &gateway.InitiateFileDownloadResponse{ - Status: status.NewInvalidArg(ctx, "path points to share name"), + Status: statRes.Status, }, nil } + return s.initiateFileDownload(ctx, req) - if s.isShareChild(ctx, p) { - log.Debug().Msgf("shared child: %s", p) - shareName, shareChild := s.splitShare(ctx, p) - - statReq := &provider.StatRequest{ - Ref: &provider.Reference{Path: shareName}, - } - statRes, err := s.stat(ctx, statReq) - if err != nil { - return &gateway.InitiateFileDownloadResponse{ - Status: status.NewInternal(ctx, err, "gateway: error stating ref:"+statReq.Ref.String()), - }, nil - } - - if statRes.Status.Code != rpc.Code_CODE_OK { - return &gateway.InitiateFileDownloadResponse{ - Status: statRes.Status, - }, nil - } - - ri, protocol, err := s.checkRef(ctx, statRes.Info) - if err != nil { - return &gateway.InitiateFileDownloadResponse{ - Status: status.NewStatusFromErrType(ctx, "error resolving reference "+statRes.Info.Target, err), - }, nil - } - - if protocol == "webdav" { - // TODO(ishank011): pass this through the datagateway service - // For now, we just expose the file server to the user - ep, opaque, err := s.webdavRefTransferEndpoint(ctx, statRes.Info.Target, shareChild) - if err != nil { - return &gateway.InitiateFileDownloadResponse{ - Status: status.NewInternal(ctx, err, "gateway: error downloading from webdav host: "+p), - }, nil - } - return &gateway.InitiateFileDownloadResponse{ - Status: status.NewOK(ctx), - Protocols: []*gateway.FileDownloadProtocol{ - { - Opaque: opaque, - Protocol: "simple", - DownloadEndpoint: ep, - }, - }, - }, nil - } - - // append child to target - req.Ref.Path = path.Join(ri.Path, shareChild) - log.Debug().Str("path", req.Ref.Path).Msg("download") - return s.initiateFileDownload(ctx, req) - } - - panic("gateway: download: unknown path:" + p) } func versionKey(req *provider.InitiateFileDownloadRequest) string { @@ -340,148 +195,7 @@ func (s *svc) initiateFileDownload(ctx context.Context, req *provider.InitiateFi } func (s *svc) InitiateFileUpload(ctx context.Context, req *provider.InitiateFileUploadRequest) (*gateway.InitiateFileUploadResponse, error) { - log := appctx.GetLogger(ctx) - if utils.IsRelativeReference(req.Ref) { - return s.initiateFileUpload(ctx, req) - } - p, st := s.getPath(ctx, req.Ref) - if st.Code != rpc.Code_CODE_OK { - return &gateway.InitiateFileUploadResponse{ - Status: st, - }, nil - } - - if !s.inSharedFolder(ctx, p) { - return s.initiateFileUpload(ctx, req) - } - - if s.isSharedFolder(ctx, p) { - log.Debug().Str("path", p).Msg("path points to shared folder") - err := errtypes.PermissionDenied("gateway: cannot upload to share folder: path=" + p) - log.Err(err).Msg("gateway: error downloading") - return &gateway.InitiateFileUploadResponse{ - Status: status.NewInvalidArg(ctx, "path points to share folder"), - }, nil - } - - if s.isShareName(ctx, p) { - log.Debug().Str("path", p).Msg("path points to share name") - statReq := &provider.StatRequest{Ref: req.Ref} - statRes, err := s.stat(ctx, statReq) - if err != nil { - return &gateway.InitiateFileUploadResponse{ - Status: status.NewInternal(ctx, err, "gateway: error stating ref:"+statReq.Ref.String()), - }, nil - } - if statRes.Status.Code != rpc.Code_CODE_OK { - return &gateway.InitiateFileUploadResponse{ - Status: statRes.Status, - }, nil - } - - if statRes.Info.Type != provider.ResourceType_RESOURCE_TYPE_REFERENCE { - err := errtypes.BadRequest(fmt.Sprintf("gateway: expected reference: got:%+v", statRes.Info)) - log.Err(err).Msg("gateway: error stating share name") - return &gateway.InitiateFileUploadResponse{ - Status: status.NewInternal(ctx, err, "gateway: error initiating upload"), - }, nil - } - - ri, protocol, err := s.checkRef(ctx, statRes.Info) - if err != nil { - return &gateway.InitiateFileUploadResponse{ - Status: status.NewStatusFromErrType(ctx, "error resolving reference "+statRes.Info.Target, err), - }, nil - } - - if protocol == "webdav" { - // TODO(ishank011): pass this through the datagateway service - // For now, we just expose the file server to the user - ep, opaque, err := s.webdavRefTransferEndpoint(ctx, statRes.Info.Target) - if err != nil { - return &gateway.InitiateFileUploadResponse{ - Status: status.NewInternal(ctx, err, "gateway: error downloading from webdav host: "+p), - }, nil - } - return &gateway.InitiateFileUploadResponse{ - Status: status.NewOK(ctx), - Protocols: []*gateway.FileUploadProtocol{ - { - Opaque: opaque, - Protocol: "simple", - UploadEndpoint: ep, - }, - }, - }, nil - } - - // if it is a file allow upload - if ri.Type == provider.ResourceType_RESOURCE_TYPE_FILE { - log.Debug().Str("path", p).Interface("ri", ri).Msg("path points to share name file") - req.Ref.Path = ri.Path - log.Debug().Str("path", ri.Path).Msg("upload") - return s.initiateFileUpload(ctx, req) - } - - err = errtypes.PermissionDenied("gateway: cannot upload to share name: path=" + p) - log.Err(err).Msg("gateway: error uploading") - return &gateway.InitiateFileUploadResponse{ - Status: status.NewInvalidArg(ctx, "path points to share name"), - }, nil - } - - if s.isShareChild(ctx, p) { - log.Debug().Msgf("shared child: %s", p) - shareName, shareChild := s.splitShare(ctx, p) - - statReq := &provider.StatRequest{Ref: &provider.Reference{Path: shareName}} - statRes, err := s.stat(ctx, statReq) - if err != nil { - return &gateway.InitiateFileUploadResponse{ - Status: status.NewInternal(ctx, err, "gateway: error stating ref:"+statReq.Ref.String()), - }, nil - } - - if statRes.Status.Code != rpc.Code_CODE_OK { - return &gateway.InitiateFileUploadResponse{ - Status: statRes.Status, - }, nil - } - - ri, protocol, err := s.checkRef(ctx, statRes.Info) - if err != nil { - return &gateway.InitiateFileUploadResponse{ - Status: status.NewStatusFromErrType(ctx, "error resolving reference "+statRes.Info.Target, err), - }, nil - } - - if protocol == "webdav" { - // TODO(ishank011): pass this through the datagateway service - // For now, we just expose the file server to the user - ep, opaque, err := s.webdavRefTransferEndpoint(ctx, statRes.Info.Target, shareChild) - if err != nil { - return &gateway.InitiateFileUploadResponse{ - Status: status.NewInternal(ctx, err, "gateway: error uploading to webdav host: "+p), - }, nil - } - return &gateway.InitiateFileUploadResponse{ - Status: status.NewOK(ctx), - Protocols: []*gateway.FileUploadProtocol{ - { - Opaque: opaque, - Protocol: "simple", - UploadEndpoint: ep, - }, - }, - }, nil - } - - // append child to target - req.Ref.Path = path.Join(ri.Path, shareChild) - return s.initiateFileUpload(ctx, req) - } - - panic("gateway: upload: unknown path:" + p) + return s.initiateFileUpload(ctx, req) } func (s *svc) initiateFileUpload(ctx context.Context, req *provider.InitiateFileUploadRequest) (*gateway.InitiateFileUploadResponse, error) { @@ -566,75 +280,11 @@ func (s *svc) GetPath(ctx context.Context, req *provider.GetPathRequest) (*provi } func (s *svc) CreateContainer(ctx context.Context, req *provider.CreateContainerRequest) (*provider.CreateContainerResponse, error) { - log := appctx.GetLogger(ctx) - if utils.IsRelativeReference(req.Ref) { return s.createContainer(ctx, req) } - p, st := s.getPath(ctx, req.Ref) - if st.Code != rpc.Code_CODE_OK { - return &provider.CreateContainerResponse{ - Status: st, - }, nil - } - - if !s.inSharedFolder(ctx, p) { - return s.createContainer(ctx, req) - } - - if s.isSharedFolder(ctx, p) || s.isShareName(ctx, p) { - log.Debug().Msgf("path:%s points to shared folder or share name", p) - err := errtypes.PermissionDenied("gateway: cannot create container on share folder or share name: path=" + p) - log.Err(err).Msg("gateway: error creating container") - return &provider.CreateContainerResponse{ - Status: status.NewInvalidArg(ctx, "path points to share folder or share name"), - }, nil - } - - if s.isShareChild(ctx, p) { - log.Debug().Msgf("shared child: %s", p) - shareName, shareChild := s.splitShare(ctx, p) - - statReq := &provider.StatRequest{Ref: &provider.Reference{Path: shareName}} - statRes, err := s.stat(ctx, statReq) - if err != nil { - return &provider.CreateContainerResponse{ - Status: status.NewInternal(ctx, err, "gateway: error stating ref:"+statReq.Ref.String()), - }, nil - } - - if statRes.Status.Code != rpc.Code_CODE_OK { - return &provider.CreateContainerResponse{ - Status: statRes.Status, - }, nil - } - - ri, protocol, err := s.checkRef(ctx, statRes.Info) - if err != nil { - return &provider.CreateContainerResponse{ - Status: status.NewStatusFromErrType(ctx, "error resolving reference "+statRes.Info.Target, err), - }, nil - } - - if protocol == "webdav" { - err = s.webdavRefMkdir(ctx, statRes.Info.Target, shareChild) - if err != nil { - return &provider.CreateContainerResponse{ - Status: status.NewInternal(ctx, err, "gateway: error creating container on webdav host: "+p), - }, nil - } - return &provider.CreateContainerResponse{ - Status: status.NewOK(ctx), - }, nil - } - - // append child to target - req.Ref.Path = path.Join(ri.Path, shareChild) - return s.createContainer(ctx, req) - } - - panic("gateway: create container on unknown path:" + p) + return s.createContainer(ctx, req) } func (s *svc) createContainer(ctx context.Context, req *provider.CreateContainerRequest) (*provider.CreateContainerResponse, error) { @@ -675,137 +325,8 @@ func (s *svc) TouchFile(ctx context.Context, req *provider.TouchFileRequest) (*p return res, nil } -// check if the path contains the prefix of the shared folder. -func (s *svc) inSharedFolder(ctx context.Context, p string) bool { - sharedFolder := s.getSharedFolder(ctx) - return strings.HasPrefix(p, sharedFolder) -} - func (s *svc) Delete(ctx context.Context, req *provider.DeleteRequest) (*provider.DeleteResponse, error) { - log := appctx.GetLogger(ctx) - p, st := s.getPath(ctx, req.Ref) - if st.Code != rpc.Code_CODE_OK { - return &provider.DeleteResponse{ - Status: st, - }, nil - } - - if !s.inSharedFolder(ctx, p) { - return s.delete(ctx, req) - } - - if s.isSharedFolder(ctx, p) { - // TODO(labkode): deleting share names should be allowed, means unmounting. - return &provider.DeleteResponse{ - Status: status.NewInvalidArg(ctx, "path points to share folder or share name"), - }, nil - } - - if s.isShareName(ctx, p) { - log.Debug().Msgf("path:%s points to share name", p) - - sRes, err := s.ListReceivedShares(ctx, &collaboration.ListReceivedSharesRequest{}) - if err != nil { - return nil, err - } - - statRes, err := s.Stat(ctx, &provider.StatRequest{ - Ref: &provider.Reference{ - Path: p, - }, - }) - if err != nil { - return nil, err - } - - // the following will check that: - // - the resource to delete is a share the current user received - // - signal the storage the delete must not land in the trashbin - // - delete the resource and update the share status to "rejected" - for _, share := range sRes.Shares { - if statRes != nil && (share.Share.ResourceId.OpaqueId == statRes.Info.Id.OpaqueId) && (share.Share.ResourceId.StorageId == statRes.Info.Id.StorageId) { - // this opaque needs explanation. It signals the storage the resource we're about to delete does not - // belong to the current user because it was share to her, thus delete the "node" and don't send it to - // the trash bin, since the share can be mounted as many times as desired. - req.Opaque = &types.Opaque{ - Map: map[string]*types.OpaqueEntry{ - "deleting_shared_resource": { - Value: []byte("true"), - Decoder: "plain", - }, - }, - } - - // the following block takes care of updating the state of the share to "rejected". This will ensure the user - // can "Accept" the share once again. - // TODO should this be pending? If so, update the two comments above as well. If not, get rid of this comment. - share.State = collaboration.ShareState_SHARE_STATE_REJECTED - r := &collaboration.UpdateReceivedShareRequest{ - Share: share, - UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"state"}}, - } - - _, err := s.UpdateReceivedShare(ctx, r) - if err != nil { - return nil, err - } - - return &provider.DeleteResponse{ - Status: status.NewOK(ctx), - }, nil - } - } - - return &provider.DeleteResponse{ - Status: status.NewNotFound(ctx, "could not find share"), - }, nil - } - - if s.isShareChild(ctx, p) { - shareName, shareChild := s.splitShare(ctx, p) - log.Debug().Msgf("path:%s sharename:%s sharechild: %s", p, shareName, shareChild) - - ref := &provider.Reference{Path: shareName} - - statReq := &provider.StatRequest{Ref: ref} - statRes, err := s.stat(ctx, statReq) - if err != nil { - return &provider.DeleteResponse{ - Status: status.NewInternal(ctx, err, "gateway: error stating ref:"+statReq.Ref.String()), - }, nil - } - - if statRes.Status.Code != rpc.Code_CODE_OK { - return &provider.DeleteResponse{ - Status: statRes.Status, - }, nil - } - - ri, protocol, err := s.checkRef(ctx, statRes.Info) - if err != nil { - return &provider.DeleteResponse{ - Status: status.NewStatusFromErrType(ctx, "error resolving reference "+statRes.Info.Target, err), - }, nil - } - - if protocol == "webdav" { - err = s.webdavRefDelete(ctx, statRes.Info.Target, shareChild) - if err != nil { - return &provider.DeleteResponse{ - Status: status.NewInternal(ctx, err, "gateway: error deleting resource on webdav host: "+p), - }, nil - } - return &provider.DeleteResponse{ - Status: status.NewOK(ctx), - }, nil - } - - // append child to target - req.Ref.Path = path.Join(ri.Path, shareChild) - return s.delete(ctx, req) - } - - panic("gateway: delete called on unknown path:" + p) + return s.delete(ctx, req) } func (s *svc) delete(ctx context.Context, req *provider.DeleteRequest) (*provider.DeleteResponse, error) { @@ -829,118 +350,7 @@ func (s *svc) delete(ctx context.Context, req *provider.DeleteRequest) (*provide } func (s *svc) Move(ctx context.Context, req *provider.MoveRequest) (*provider.MoveResponse, error) { - log := appctx.GetLogger(ctx) - p, st := s.getPath(ctx, req.Source) - if st.Code != rpc.Code_CODE_OK { - return &provider.MoveResponse{ - Status: st, - }, nil - } - - dp, st := s.getPath(ctx, req.Destination) - if st.Code != rpc.Code_CODE_OK && st.Code != rpc.Code_CODE_NOT_FOUND { - return &provider.MoveResponse{ - Status: st, - }, nil - } - - if !s.inSharedFolder(ctx, p) && !s.inSharedFolder(ctx, dp) { - return s.move(ctx, req) - } - - // allow renaming the share folder, the mount point, not the target. - if s.isShareName(ctx, p) && s.isShareName(ctx, dp) { - log.Info().Msgf("gateway: move: renaming share mountpoint: from:%s to:%s", p, dp) - return s.move(ctx, req) - } - - // resolve references and check the ref points to the same base path, paranoia check. - if s.isShareChild(ctx, p) && s.isShareChild(ctx, dp) { - shareName, shareChild := s.splitShare(ctx, p) - dshareName, dshareChild := s.splitShare(ctx, dp) - log.Debug().Msgf("srcpath:%s dstpath:%s srcsharename:%s srcsharechild: %s dstsharename:%s dstsharechild:%s ", p, dp, shareName, shareChild, dshareName, dshareChild) - - srcStatReq := &provider.StatRequest{Ref: &provider.Reference{Path: shareName}} - srcStatRes, err := s.stat(ctx, srcStatReq) - if err != nil { - return &provider.MoveResponse{ - Status: status.NewInternal(ctx, err, "gateway: error stating ref:"+srcStatReq.Ref.String()), - }, nil - } - - if srcStatRes.Status.Code != rpc.Code_CODE_OK { - return &provider.MoveResponse{ - Status: srcStatRes.Status, - }, nil - } - - dstStatReq := &provider.StatRequest{Ref: &provider.Reference{Path: dshareName}} - dstStatRes, err := s.stat(ctx, dstStatReq) - if err != nil { - return &provider.MoveResponse{ - Status: status.NewInternal(ctx, err, "gateway: error stating ref:"+srcStatReq.Ref.String()), - }, nil - } - - if dstStatRes.Status.Code != rpc.Code_CODE_OK { - return &provider.MoveResponse{ - Status: srcStatRes.Status, - }, nil - } - - srcRi, srcProtocol, err := s.checkRef(ctx, srcStatRes.Info) - if err != nil { - return &provider.MoveResponse{ - Status: status.NewStatusFromErrType(ctx, "error resolving reference "+srcStatRes.Info.Target, err), - }, nil - } - - if srcProtocol == "webdav" { - err = s.webdavRefMove(ctx, dstStatRes.Info.Target, shareChild, dshareChild) - if err != nil { - return &provider.MoveResponse{ - Status: status.NewInternal(ctx, err, "gateway: error moving resource on webdav host: "+p), - }, nil - } - return &provider.MoveResponse{ - Status: status.NewOK(ctx), - }, nil - } - dstRi, dstProtocol, err := s.checkRef(ctx, dstStatRes.Info) - if err != nil { - return &provider.MoveResponse{ - Status: status.NewStatusFromErrType(ctx, "error resolving reference "+srcStatRes.Info.Target, err), - }, nil - } - - if dstProtocol == "webdav" { - err = s.webdavRefMove(ctx, dstStatRes.Info.Target, shareChild, dshareChild) - if err != nil { - return &provider.MoveResponse{ - Status: status.NewInternal(ctx, err, "gateway: error moving resource on webdav host: "+p), - }, nil - } - return &provider.MoveResponse{ - Status: status.NewOK(ctx), - }, nil - } - - src := &provider.Reference{ - Path: path.Join(srcRi.Path, shareChild), - } - dst := &provider.Reference{ - Path: path.Join(dstRi.Path, dshareChild), - } - - req.Source = src - req.Destination = dst - - return s.move(ctx, req) - } - - return &provider.MoveResponse{ - Status: status.NewStatusFromErrType(ctx, "move", errtypes.BadRequest("gateway: move called on unknown path: "+p)), - }, nil + return s.move(ctx, req) } func (s *svc) move(ctx context.Context, req *provider.MoveRequest) (*provider.MoveResponse, error) { @@ -1050,151 +460,60 @@ func (s *svc) SetLock(ctx context.Context, req *provider.SetLockRequest) (*provi func (s *svc) GetLock(ctx context.Context, req *provider.GetLockRequest) (*provider.GetLockResponse, error) { c, err := s.find(ctx, req.Ref) if err != nil { - return &provider.GetLockResponse{ - Status: status.NewStatusFromErrType(ctx, "GetLock ref="+req.Ref.String(), err), - }, nil - } - - res, err := c.GetLock(ctx, req) - if err != nil { - if gstatus.Code(err) == codes.PermissionDenied { - return &provider.GetLockResponse{Status: &rpc.Status{Code: rpc.Code_CODE_PERMISSION_DENIED}}, nil - } - return nil, errors.Wrap(err, "gateway: error calling GetLock") - } - - return res, nil -} - -// RefreshLock refreshes an existing lock on the given reference. -func (s *svc) RefreshLock(ctx context.Context, req *provider.RefreshLockRequest) (*provider.RefreshLockResponse, error) { - c, err := s.find(ctx, req.Ref) - if err != nil { - return &provider.RefreshLockResponse{ - Status: status.NewStatusFromErrType(ctx, "RefreshLock ref="+req.Ref.String(), err), - }, nil - } - - res, err := c.RefreshLock(ctx, req) - if err != nil { - if gstatus.Code(err) == codes.PermissionDenied { - return &provider.RefreshLockResponse{Status: &rpc.Status{Code: rpc.Code_CODE_PERMISSION_DENIED}}, nil - } - return nil, errors.Wrap(err, "gateway: error calling RefreshLock") - } - - return res, nil -} - -// Unlock removes an existing lock from the given reference. -func (s *svc) Unlock(ctx context.Context, req *provider.UnlockRequest) (*provider.UnlockResponse, error) { - c, err := s.find(ctx, req.Ref) - if err != nil { - return &provider.UnlockResponse{ - Status: status.NewStatusFromErrType(ctx, "Unlock ref="+req.Ref.String(), err), - }, nil - } - - res, err := c.Unlock(ctx, req) - if err != nil { - if gstatus.Code(err) == codes.PermissionDenied { - return &provider.UnlockResponse{Status: &rpc.Status{Code: rpc.Code_CODE_PERMISSION_DENIED}}, nil - } - return nil, errors.Wrap(err, "gateway: error calling Unlock") - } - - return res, nil -} - -func (s *svc) statHome(ctx context.Context) (*provider.StatResponse, error) { - statRes, err := s.stat(ctx, &provider.StatRequest{Ref: &provider.Reference{Path: s.getHome(ctx)}}) - if err != nil { - return &provider.StatResponse{ - Status: status.NewInternal(ctx, err, "gateway: error stating home"), - }, nil - } - - if statRes.Status.Code != rpc.Code_CODE_OK { - return &provider.StatResponse{ - Status: statRes.Status, - }, nil - } - - statSharedFolder, err := s.statSharesFolder(ctx) - if err != nil { - return &provider.StatResponse{ - Status: status.NewInternal(ctx, err, "gateway: error stating shares folder"), - }, nil - } - if statSharedFolder.Status.Code != rpc.Code_CODE_OK { - // If shares folder is not found, skip updating the etag - if statSharedFolder.Status.Code == rpc.Code_CODE_NOT_FOUND { - return statRes, nil - } - // otherwise return stat of share folder - return &provider.StatResponse{ - Status: statSharedFolder.Status, + return &provider.GetLockResponse{ + Status: status.NewStatusFromErrType(ctx, "GetLock ref="+req.Ref.String(), err), }, nil } - if etagIface, err := s.etagCache.Get(statRes.Info.Owner.OpaqueId + ":" + statRes.Info.Path); err == nil { - resMtime := utils.TSToTime(statRes.Info.Mtime) - resEtag := etagIface.(etagWithTS) - // Use the updated etag if the home folder has been modified - if resMtime.Before(resEtag.Timestamp) { - statRes.Info.Etag = resEtag.Etag - } - } else { - statRes.Info.Etag = etag.GenerateEtagFromResources(statRes.Info, []*provider.ResourceInfo{statSharedFolder.Info}) - if s.c.EtagCacheTTL > 0 { - _ = s.etagCache.Set(statRes.Info.Owner.OpaqueId+":"+statRes.Info.Path, etagWithTS{statRes.Info.Etag, time.Now()}) + res, err := c.GetLock(ctx, req) + if err != nil { + if gstatus.Code(err) == codes.PermissionDenied { + return &provider.GetLockResponse{Status: &rpc.Status{Code: rpc.Code_CODE_PERMISSION_DENIED}}, nil } + return nil, errors.Wrap(err, "gateway: error calling GetLock") } - return statRes, nil + return res, nil } -func (s *svc) statSharesFolder(ctx context.Context) (*provider.StatResponse, error) { - statRes, err := s.stat(ctx, &provider.StatRequest{Ref: &provider.Reference{Path: s.getSharedFolder(ctx)}}) +// RefreshLock refreshes an existing lock on the given reference. +func (s *svc) RefreshLock(ctx context.Context, req *provider.RefreshLockRequest) (*provider.RefreshLockResponse, error) { + c, err := s.find(ctx, req.Ref) if err != nil { - return &provider.StatResponse{ - Status: status.NewInternal(ctx, err, "gateway: error stating shares folder"), + return &provider.RefreshLockResponse{ + Status: status.NewStatusFromErrType(ctx, "RefreshLock ref="+req.Ref.String(), err), }, nil } - if statRes.Status.Code != rpc.Code_CODE_OK { - return &provider.StatResponse{ - Status: statRes.Status, - }, nil + res, err := c.RefreshLock(ctx, req) + if err != nil { + if gstatus.Code(err) == codes.PermissionDenied { + return &provider.RefreshLockResponse{Status: &rpc.Status{Code: rpc.Code_CODE_PERMISSION_DENIED}}, nil + } + return nil, errors.Wrap(err, "gateway: error calling RefreshLock") } - lsRes, err := s.listSharesFolder(ctx) + return res, nil +} + +// Unlock removes an existing lock from the given reference. +func (s *svc) Unlock(ctx context.Context, req *provider.UnlockRequest) (*provider.UnlockResponse, error) { + c, err := s.find(ctx, req.Ref) if err != nil { - return &provider.StatResponse{ - Status: status.NewInternal(ctx, err, "gateway: error listing shares folder"), - }, nil - } - if lsRes.Status.Code != rpc.Code_CODE_OK { - return &provider.StatResponse{ - Status: lsRes.Status, + return &provider.UnlockResponse{ + Status: status.NewStatusFromErrType(ctx, "Unlock ref="+req.Ref.String(), err), }, nil } - if etagIface, err := s.etagCache.Get(statRes.Info.Owner.OpaqueId + ":" + statRes.Info.Path); err == nil { - resMtime := utils.TSToTime(statRes.Info.Mtime) - resEtag := etagIface.(etagWithTS) - // Use the updated etag if the shares folder has been modified, i.e., a new - // reference has been created. - if resMtime.Before(resEtag.Timestamp) { - statRes.Info.Etag = resEtag.Etag - } - } else { - statRes.Info.Etag = etag.GenerateEtagFromResources(statRes.Info, lsRes.Infos) - if s.c.EtagCacheTTL > 0 { - _ = s.etagCache.Set(statRes.Info.Owner.OpaqueId+":"+statRes.Info.Path, etagWithTS{statRes.Info.Etag, time.Now()}) + res, err := c.Unlock(ctx, req) + if err != nil { + if gstatus.Code(err) == codes.PermissionDenied { + return &provider.UnlockResponse{Status: &rpc.Status{Code: rpc.Code_CODE_PERMISSION_DENIED}}, nil } + return nil, errors.Wrap(err, "gateway: error calling Unlock") } - return statRes, nil + + return res, nil } func (s *svc) stat(ctx context.Context, req *provider.StatRequest) (*provider.StatResponse, error) { @@ -1245,147 +564,7 @@ func (s *svc) Stat(ctx context.Context, req *provider.StatRequest) (*provider.St if utils.IsRelativeReference(req.Ref) { return s.stat(ctx, req) } - - p := "" - var res *provider.StatResponse - var err error - if utils.IsAbsolutePathReference(req.Ref) { - p = req.Ref.Path - } else { - // Reference by just resource ID - // Stat it and store for future use - res, err = s.stat(ctx, req) - if err != nil { - return &provider.StatResponse{ - Status: status.NewInternal(ctx, err, "gateway: error stating ref:"+req.Ref.String()), - }, nil - } - if res != nil && res.Status.Code != rpc.Code_CODE_OK { - return res, nil - } - p = res.Info.Path - } - - if path.Clean(p) == s.getHome(ctx) { - return s.statHome(ctx) - } - - if s.isSharedFolder(ctx, p) { - return s.statSharesFolder(ctx) - } - - if !s.inSharedFolder(ctx, p) { - if res != nil { - return res, nil - } - return s.stat(ctx, req) - } - - // we need to provide the info of the target, not the reference. - if s.isShareName(ctx, p) { - // If we haven't returned an error by now and res is nil, it means that - // req is an absolute path based ref, so we didn't stat it previously. - // So stat it now - if res == nil { - res, err = s.stat(ctx, req) - if err != nil { - return &provider.StatResponse{ - Status: status.NewInternal(ctx, err, "gateway: error stating ref:"+req.Ref.String()), - }, nil - } - - if res.Status.Code != rpc.Code_CODE_OK { - return &provider.StatResponse{ - Status: res.Status, - }, nil - } - } - - ri, protocol, err := s.checkRef(ctx, res.Info) - if err != nil { - return &provider.StatResponse{ - Status: status.NewStatusFromErrType(ctx, "error resolving reference "+res.Info.Target, err), - }, nil - } - - if protocol == "webdav" { - ri, err = s.webdavRefStat(ctx, res.Info.Target) - if err != nil { - return &provider.StatResponse{ - Status: status.NewInternal(ctx, err, "gateway: error resolving webdav reference: "+p), - }, nil - } - } - - // we need to make sure we don't expose the reference target in the resource - // information. For example, if requests comes to: /home/MyShares/photos and photos - // is reference to /user/peter/Holidays/photos, we need to still return to the user - // /home/MyShares/photos - orgPath := res.Info.Path - res.Info = ri - res.Info.Path = orgPath - return res, nil - } - - if s.isShareChild(ctx, p) { - shareName, shareChild := s.splitShare(ctx, p) - - statReq := &provider.StatRequest{Ref: &provider.Reference{Path: shareName}} - statRes, err := s.stat(ctx, statReq) - if err != nil { - return &provider.StatResponse{ - Status: status.NewInternal(ctx, err, "gateway: error stating ref:"+statReq.Ref.String()), - }, nil - } - - if statRes.Status.Code != rpc.Code_CODE_OK { - return &provider.StatResponse{ - Status: statRes.Status, - }, nil - } - - ri, protocol, err := s.checkRef(ctx, statRes.Info) - if err != nil { - return &provider.StatResponse{ - Status: status.NewStatusFromErrType(ctx, "error resolving reference "+statRes.Info.Target, err), - }, nil - } - - if protocol == "webdav" { - ri, err = s.webdavRefStat(ctx, statRes.Info.Target, shareChild) - if err != nil { - return &provider.StatResponse{ - Status: status.NewInternal(ctx, err, "gateway: error resolving webdav reference: "+p), - }, nil - } - ri.Path = p - return &provider.StatResponse{ - Status: status.NewOK(ctx), - Info: ri, - }, nil - } - - // append child to target - req.Ref.Path = path.Join(ri.Path, shareChild) - res, err := s.stat(ctx, req) - if err != nil { - return &provider.StatResponse{ - Status: status.NewInternal(ctx, err, "gateway: error stating ref:"+req.Ref.String()), - }, nil - } - if res.Status.Code != rpc.Code_CODE_OK { - return &provider.StatResponse{ - Status: res.Status, - }, nil - } - - // we need to make sure we don't expose the reference target in the resource - // information. - res.Info.Path = p - return res, nil - } - - panic("gateway: stating an unknown path:" + p) + return s.stat(ctx, req) } func (s *svc) checkRef(ctx context.Context, ri *provider.ResourceInfo) (*provider.ResourceInfo, string, error) { @@ -1467,83 +646,6 @@ func (s *svc) ListContainerStream(_ *provider.ListContainerStreamRequest, _ gate return errtypes.NotSupported("Unimplemented") } -func (s *svc) listHome(ctx context.Context, req *provider.ListContainerRequest) (*provider.ListContainerResponse, error) { - lcr, err := s.listContainer(ctx, &provider.ListContainerRequest{ - Ref: &provider.Reference{Path: s.getHome(ctx)}, - ArbitraryMetadataKeys: req.ArbitraryMetadataKeys, - }) - if err != nil { - return &provider.ListContainerResponse{ - Status: status.NewInternal(ctx, err, "gateway: error listing home"), - }, nil - } - if lcr.Status.Code != rpc.Code_CODE_OK { - return &provider.ListContainerResponse{ - Status: lcr.Status, - }, nil - } - - for i := range lcr.Infos { - if s.isSharedFolder(ctx, lcr.Infos[i].GetPath()) { - statSharedFolder, err := s.statSharesFolder(ctx) - if err != nil { - return &provider.ListContainerResponse{ - Status: status.NewInternal(ctx, err, "gateway: error stating shares folder"), - }, nil - } - if statSharedFolder.Status.Code != rpc.Code_CODE_OK { - return &provider.ListContainerResponse{ - Status: statSharedFolder.Status, - }, nil - } - lcr.Infos[i] = statSharedFolder.Info - break - } - } - - return lcr, nil -} - -func (s *svc) listSharesFolder(ctx context.Context) (*provider.ListContainerResponse, error) { - lcr, err := s.listContainer(ctx, &provider.ListContainerRequest{Ref: &provider.Reference{Path: s.getSharedFolder(ctx)}}) - if err != nil { - return &provider.ListContainerResponse{ - Status: status.NewInternal(ctx, err, "gateway: error listing shared folder"), - }, nil - } - if lcr.Status.Code != rpc.Code_CODE_OK { - return &provider.ListContainerResponse{ - Status: lcr.Status, - }, nil - } - checkedInfos := make([]*provider.ResourceInfo, 0) - for i := range lcr.Infos { - info, protocol, err := s.checkRef(ctx, lcr.Infos[i]) - if err != nil { - // create status to log the proper messages - // this might arise when the shared resource has been moved to the recycle bin - // this might arise when the resource was unshared, but the share reference was not removed - status.NewStatusFromErrType(ctx, "error resolving reference "+lcr.Infos[i].Target, err) - // continue on errors so the user can see a list of the working shares - continue - } - - if protocol == "webdav" { - info, err = s.webdavRefStat(ctx, lcr.Infos[i].Target) - if err != nil { - // Might be the case that the webdav token has expired - continue - } - } - - info.Path = lcr.Infos[i].Path - checkedInfos = append(checkedInfos, info) - } - lcr.Infos = checkedInfos - - return lcr, nil -} - func (s *svc) filterProvidersByUserAgent(ctx context.Context, providers []*registry.ProviderInfo) []*registry.ProviderInfo { cat, ok := appctx.ContextGetUserAgentCategory(ctx) if !ok { @@ -1660,183 +762,7 @@ func (s *svc) listContainerAcrossProviders(ctx context.Context, req *provider.Li } func (s *svc) ListContainer(ctx context.Context, req *provider.ListContainerRequest) (*provider.ListContainerResponse, error) { - log := appctx.GetLogger(ctx) - - if utils.IsRelativeReference(req.Ref) { - return s.listContainer(ctx, req) - } - - p, st := s.getPath(ctx, req.Ref, req.ArbitraryMetadataKeys...) - if st.Code != rpc.Code_CODE_OK { - return &provider.ListContainerResponse{ - Status: st, - }, nil - } - - if path.Clean(p) == s.getHome(ctx) { - return s.listHome(ctx, req) - } - - if s.isSharedFolder(ctx, p) { - return s.listSharesFolder(ctx) - } - - if !s.inSharedFolder(ctx, p) { - return s.listContainer(ctx, req) - } - - // we need to provide the info of the target, not the reference. - if s.isShareName(ctx, p) { - statReq := &provider.StatRequest{Ref: &provider.Reference{Path: p}} - statRes, err := s.stat(ctx, statReq) - if err != nil { - return &provider.ListContainerResponse{ - Status: status.NewInternal(ctx, err, "gateway: error stating share:"+statReq.Ref.String()), - }, nil - } - - if statRes.Status.Code != rpc.Code_CODE_OK { - return &provider.ListContainerResponse{ - Status: statRes.Status, - }, nil - } - - ri, protocol, err := s.checkRef(ctx, statRes.Info) - if err != nil { - return &provider.ListContainerResponse{ - Status: status.NewStatusFromErrType(ctx, "error resolving reference "+statRes.Info.Target, err), - }, nil - } - - if protocol == "webdav" { - infos, err := s.webdavRefLs(ctx, statRes.Info.Target) - if err != nil { - return &provider.ListContainerResponse{ - Status: status.NewInternal(ctx, err, "gateway: error listing webdav reference: "+p), - }, nil - } - - for _, info := range infos { - base := path.Base(info.Path) - info.Path = path.Join(p, base) - } - return &provider.ListContainerResponse{ - Status: status.NewOK(ctx), - Infos: infos, - }, nil - } - - if ri.Type != provider.ResourceType_RESOURCE_TYPE_CONTAINER { - err := errtypes.NotSupported("gateway: list container: cannot list non-container type:" + ri.Path) - log.Err(err).Msg("gateway: error listing") - return &provider.ListContainerResponse{ - Status: status.NewInvalidArg(ctx, "resource is not a container"), - }, nil - } - - newReq := &provider.ListContainerRequest{ - Ref: &provider.Reference{Path: ri.Path}, - ArbitraryMetadataKeys: req.ArbitraryMetadataKeys, - } - newRes, err := s.listContainer(ctx, newReq) - if err != nil { - return &provider.ListContainerResponse{ - Status: status.NewInternal(ctx, err, "gateway: error listing "+newReq.Ref.String()), - }, nil - } - - if newRes.Status.Code != rpc.Code_CODE_OK { - return &provider.ListContainerResponse{ - Status: newRes.Status, - }, nil - } - - // paths needs to be converted - for _, info := range newRes.Infos { - base := path.Base(info.Path) - info.Path = path.Join(p, base) - } - - return newRes, nil - } - - if s.isShareChild(ctx, p) { - shareName, shareChild := s.splitShare(ctx, p) - - statReq := &provider.StatRequest{Ref: &provider.Reference{Path: shareName}} - statRes, err := s.stat(ctx, statReq) - if err != nil { - return &provider.ListContainerResponse{ - Status: status.NewInternal(ctx, err, "gateway: error stating share child "+statReq.Ref.String()), - }, nil - } - - if statRes.Status.Code != rpc.Code_CODE_OK { - return &provider.ListContainerResponse{ - Status: statRes.Status, - }, nil - } - - ri, protocol, err := s.checkRef(ctx, statRes.Info) - if err != nil { - return &provider.ListContainerResponse{ - Status: status.NewStatusFromErrType(ctx, "error resolving reference "+statRes.Info.Target, err), - }, nil - } - - if protocol == "webdav" { - infos, err := s.webdavRefLs(ctx, statRes.Info.Target, shareChild) - if err != nil { - return &provider.ListContainerResponse{ - Status: status.NewInternal(ctx, err, "gateway: error listing webdav reference: "+p), - }, nil - } - - for _, info := range infos { - base := path.Base(info.Path) - info.Path = path.Join(shareName, shareChild, base) - } - return &provider.ListContainerResponse{ - Status: status.NewOK(ctx), - Infos: infos, - }, nil - } - - if ri.Type != provider.ResourceType_RESOURCE_TYPE_CONTAINER { - err := errtypes.NotSupported("gateway: list container: cannot list non-container type:" + ri.Path) - log.Err(err).Msg("gateway: error listing") - return &provider.ListContainerResponse{ - Status: status.NewInvalidArg(ctx, "resource is not a container"), - }, nil - } - - newReq := &provider.ListContainerRequest{ - Ref: &provider.Reference{Path: path.Join(ri.Path, shareChild)}, - ArbitraryMetadataKeys: req.ArbitraryMetadataKeys, - } - newRes, err := s.listContainer(ctx, newReq) - if err != nil { - return &provider.ListContainerResponse{ - Status: status.NewInternal(ctx, err, "gateway: error listing "+newReq.Ref.String()), - }, nil - } - - if newRes.Status.Code != rpc.Code_CODE_OK { - return &provider.ListContainerResponse{ - Status: newRes.Status, - }, nil - } - - // paths needs to be converted - for _, info := range newRes.Infos { - base := path.Base(info.Path) - info.Path = path.Join(shareName, shareChild, base) - } - - return newRes, nil - } - - panic("gateway: stating an unknown path:" + p) + return s.listContainer(ctx, req) } func (s *svc) getPath(ctx context.Context, ref *provider.Reference, keys ...string) (string, *rpc.Status) { @@ -1923,12 +849,6 @@ func (s *svc) splitPath(_ context.Context, p string) []string { return strings.SplitN(p, "/", 4) // ["home", "MyShares", "photos", "Ibiza/beach.png"] } -func (s *svc) getSharedFolder(ctx context.Context) string { - home := s.getHome(ctx) - shareFolder := path.Join(home, s.c.ShareFolder) - return shareFolder -} - func (s *svc) CreateSymlink(ctx context.Context, req *provider.CreateSymlinkRequest) (*provider.CreateSymlinkResponse, error) { return &provider.CreateSymlinkResponse{ Status: status.NewUnimplemented(ctx, errtypes.NotSupported("CreateSymlink not implemented"), "CreateSymlink not implemented"), diff --git a/internal/grpc/services/storageprovider/storageprovider.go b/internal/grpc/services/storageprovider/storageprovider.go index fe90c334b4..f1c4a7d75a 100644 --- a/internal/grpc/services/storageprovider/storageprovider.go +++ b/internal/grpc/services/storageprovider/storageprovider.go @@ -175,6 +175,10 @@ func New(ctx context.Context, m map[string]interface{}) (rgrpc.Service, error) { return nil, err } + if fs == nil { + return nil, errors.New("error creating fs driver") + } + // parse data server url u, err := url.Parse(c.DataServerURL) if err != nil { @@ -924,17 +928,6 @@ func (s *service) ListContainerStream(req *provider.ListContainerStreamRequest, func (s *service) ListContainer(ctx context.Context, req *provider.ListContainerRequest) (*provider.ListContainerResponse, error) { newRef, err := s.unwrap(ctx, req.Ref) - if err != nil { - // The path might be a virtual view; handle that case - if utils.IsAbsolutePathReference(req.Ref) && strings.HasPrefix(s.mountPath, req.Ref.Path) { - return s.listVirtualView(ctx, req.Ref) - } - - return &provider.ListContainerResponse{ - Status: status.NewInternal(ctx, err, "error unwrapping path"), - }, nil - } - mds, err := s.storage.ListFolder(ctx, newRef, req.ArbitraryMetadataKeys) if err != nil { var st *rpc.Status @@ -1551,7 +1544,8 @@ func (s *service) unwrap(ctx context.Context, ref *provider.Reference) (*provide func (s *service) trimMountPrefix(fn string) (string, error) { if strings.HasPrefix(fn, s.mountPath) { - return path.Join("/", strings.TrimPrefix(fn, s.mountPath)), nil + p := path.Join("/", strings.TrimPrefix(fn, s.mountPath)) + return p, nil } return "", errtypes.BadRequest(fmt.Sprintf("path=%q does not belong to this storage provider mount path=%q", fn, s.mountPath)) } diff --git a/internal/http/services/datagateway/datagateway.go b/internal/http/services/datagateway/datagateway.go index 605cd5cc88..7f4029d3ef 100644 --- a/internal/http/services/datagateway/datagateway.go +++ b/internal/http/services/datagateway/datagateway.go @@ -322,7 +322,6 @@ func (s *svc) doPut(w http.ResponseWriter, r *http.Request) { return } httpReq.Header = r.Header - httpRes, err := httpClient.Do(httpReq) if err != nil { log.Err(err).Msg("error doing PUT request to data service") diff --git a/internal/http/services/owncloud/ocdav/ocdav.go b/internal/http/services/owncloud/ocdav/ocdav.go index 4e996c1ee4..0b14453dfb 100644 --- a/internal/http/services/owncloud/ocdav/ocdav.go +++ b/internal/http/services/owncloud/ocdav/ocdav.go @@ -30,7 +30,6 @@ import ( "time" gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" - userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" "github.com/cs3org/reva/pkg/appctx" "github.com/cs3org/reva/pkg/errtypes" @@ -42,7 +41,6 @@ import ( "github.com/cs3org/reva/pkg/sharedconf" "github.com/cs3org/reva/pkg/storage/favorite" "github.com/cs3org/reva/pkg/storage/favorite/registry" - "github.com/cs3org/reva/pkg/storage/utils/templates" "github.com/cs3org/reva/pkg/utils/cfg" "github.com/pkg/errors" ) @@ -291,19 +289,22 @@ func (s *svc) getClient() (gateway.GatewayAPIClient, error) { } func applyLayout(ctx context.Context, ns string, useLoggedInUserNS bool, requestPath string) string { + return ns // If useLoggedInUserNS is false, that implies that the request is coming from // the FilesHandler method invoked by a /dav/files/fileOwner where fileOwner // is not the same as the logged in user. In that case, we'll treat fileOwner // as the username whose files are to be accessed and use that in the // namespace template. - u, ok := appctx.ContextGetUser(ctx) - if !ok || !useLoggedInUserNS { - requestUserID, _ := router.ShiftPath(requestPath) - u = &userpb.User{ - Username: requestUserID, + /* + u, ok := appctx.ContextGetUser(ctx) + if !ok || !useLoggedInUserNS { + requestUserID, _ := router.ShiftPath(requestPath) + u = &userpb.User{ + Username: requestUserID, + } } - } - return templates.WithUser(u, ns) + return templates.WithUser(u, ns) + */ } func addAccessHeaders(w http.ResponseWriter, r *http.Request) { diff --git a/internal/http/services/owncloud/ocdav/propfind.go b/internal/http/services/owncloud/ocdav/propfind.go index d762f63edf..0ef6544c00 100644 --- a/internal/http/services/owncloud/ocdav/propfind.go +++ b/internal/http/services/owncloud/ocdav/propfind.go @@ -260,6 +260,7 @@ func (s *svc) getResourceInfos(ctx context.Context, w http.ResponseWriter, r *ht Ref: ref, ArbitraryMetadataKeys: metadataKeys, } + res, err := client.Stat(ctx, req) if err != nil { log.Error().Err(err).Interface("req", req).Msg("error sending a grpc stat request") diff --git a/internal/http/services/owncloud/ocs/data/capabilities.go b/internal/http/services/owncloud/ocs/data/capabilities.go index 7fc6dcaedf..18a93c8676 100644 --- a/internal/http/services/owncloud/ocs/data/capabilities.go +++ b/internal/http/services/owncloud/ocs/data/capabilities.go @@ -53,7 +53,7 @@ type Capabilities struct { Core *CapabilitiesCore `json:"core" xml:"core"` Checksums *CapabilitiesChecksums `json:"checksums" xml:"checksums"` Files *CapabilitiesFiles `json:"files" mapstructure:"files" xml:"files"` - Dav *CapabilitiesDav `json:"dav" xml:"dav"` + Dav *CapabilitiesDav `json:"dav" mapstructure:"dav" xml:"dav"` FilesSharing *CapabilitiesFilesSharing `json:"files_sharing" mapstructure:"files_sharing" xml:"files_sharing"` Spaces *Spaces `json:"spaces,omitempty" mapstructure:"spaces" xml:"spaces,omitempty"` diff --git a/internal/http/services/owncloud/ocs/handlers/cloud/capabilities/uploads.go b/internal/http/services/owncloud/ocs/handlers/cloud/capabilities/uploads.go index 26d3ddfb6c..014575a92d 100644 --- a/internal/http/services/owncloud/ocs/handlers/cloud/capabilities/uploads.go +++ b/internal/http/services/owncloud/ocs/handlers/cloud/capabilities/uploads.go @@ -49,32 +49,41 @@ func (h *Handler) getCapabilitiesForUserAgent(_ context.Context, userAgent strin } func setCapabilitiesForChunkProtocol(cp chunkProtocol, c *data.Capabilities) { - switch cp { - case chunkV1: - // 2.7+ will use Chunking V1 if "capabilities > files > bigfilechunking" is "true" AND "capabilities > dav > chunking" is not there - c.Files.BigFileChunking = true - c.Dav = nil - c.Files.TusSupport = nil + // 2.7+ will use Chunking V1 if "capabilities > files > bigfilechunking" is "true" AND "capabilities > dav > chunking" is not there + c.Files.BigFileChunking = true + c.Dav = nil + c.Files.TusSupport = nil + /* + switch cp { + case chunkV1: - case chunkNG: - // 2.7+ will use Chunking NG if "capabilities > files > bigfilechunking" is "true" AND "capabilities > dav > chunking" = 1.0 - c.Files.BigFileChunking = true - c.Dav.Chunking = "1.0" - c.Files.TusSupport = nil + // 2.7+ will use Chunking V1 if "capabilities > files > bigfilechunking" is "true" AND "capabilities > dav > chunking" is not there + c.Files.BigFileChunking = true + c.Dav = nil + c.Files.TusSupport = nil - case chunkTUS: - // 2.7+ will use TUS if "capabilities > files > bigfilechunking" is "false" AND "capabilities > dav > chunking" = "" AND "capabilities > files > tus_support" has proper entries. - c.Files.BigFileChunking = false - c.Dav.Chunking = "" + case chunkNG: - // TODO: infer from various TUS handlers from all known storages - // until now we take the manually configured tus options - // c.Capabilities.Files.TusSupport = &data.CapabilitiesFilesTusSupport{ - // Version: "1.0.0", - // Resumable: "1.0.0", - // Extension: "creation,creation-with-upload", - // MaxChunkSize: 0, - // HTTPMethodOverride: "", - // } - } + // 2.7+ will use Chunking NG if "capabilities > files > bigfilechunking" is "true" AND "capabilities > dav > chunking" = 1.0 + c.Files.BigFileChunking = true + c.Dav.Chunking = "1.0" + c.Files.TusSupport = nil + + case chunkTUS: + + // 2.7+ will use TUS if "capabilities > files > bigfilechunking" is "false" AND "capabilities > dav > chunking" = "" AND "capabilities > files > tus_support" has proper entries. + c.Files.BigFileChunking = false + c.Dav.Chunking = "" + + // TODO: infer from various TUS handlers from all known storages + // until now we take the manually configured tus options + // c.Capabilities.Files.TusSupport = &data.CapabilitiesFilesTusSupport{ + // Version: "1.0.0", + // Resumable: "1.0.0", + // Extension: "creation,creation-with-upload", + // MaxChunkSize: 0, + // HTTPMethodOverride: "", + // } + } + */ } diff --git a/pkg/auth/manager/ldap/ldap.go b/pkg/auth/manager/ldap/ldap.go index 16374226e0..7bfe436564 100644 --- a/pkg/auth/manager/ldap/ldap.go +++ b/pkg/auth/manager/ldap/ldap.go @@ -124,7 +124,7 @@ func (am *mgr) Authenticate(ctx context.Context, clientID, clientSecret string) log := appctx.GetLogger(ctx) l, err := utils.GetLDAPConnection(&am.c.LDAPConn) if err != nil { - return nil, nil, err + return nil, nil, errors.Wrap(err, "error creating ldap connection") } defer l.Close() @@ -139,10 +139,10 @@ func (am *mgr) Authenticate(ctx context.Context, clientID, clientSecret string) sr, err := l.Search(searchRequest) if err != nil { - return nil, nil, err + return nil, nil, errors.Wrapf(err, "error searching. seachrequest = %+v", searchRequest) } - log.Trace().Interface("entries", sr.Entries).Send() + log.Debug().Interface("entries", sr.Entries).Send() if len(sr.Entries) != 1 { return nil, nil, errtypes.NotFound(clientID) } @@ -153,7 +153,7 @@ func (am *mgr) Authenticate(ctx context.Context, clientID, clientSecret string) err = l.Bind(userdn, clientSecret) if err != nil { log.Debug().Err(err).Interface("userdn", userdn).Msg("bind with user credentials failed") - return nil, nil, err + return nil, nil, errors.Wrapf(err, "error binding with user credentials for user %s", userdn) } userID := &user.UserId{ @@ -193,7 +193,7 @@ func (am *mgr) Authenticate(ctx context.Context, clientID, clientSecret string) u := &user.User{ Id: userID, // TODO add more claims from the StandardClaims, eg EmailVerified - Username: sr.Entries[0].GetEqualFoldAttributeValue(am.c.Schema.CN), + Username: sr.Entries[0].GetEqualFoldAttributeValue(am.c.Schema.UID), // TODO groups Groups: getGroupsResp.Groups, Mail: sr.Entries[0].GetEqualFoldAttributeValue(am.c.Schema.Mail), diff --git a/pkg/storage/fs/cephfs/cephfs.go b/pkg/storage/fs/cephfs/cephfs.go index 1f32df6bb2..0efdeb069f 100644 --- a/pkg/storage/fs/cephfs/cephfs.go +++ b/pkg/storage/fs/cephfs/cephfs.go @@ -48,14 +48,8 @@ import ( ) const ( - xattrTrustedNs = "trusted." - xattrEID = xattrTrustedNs + "eid" - xattrMd5 = xattrTrustedNs + "checksum" - xattrMd5ts = xattrTrustedNs + "checksumTS" - xattrRef = xattrTrustedNs + "ref" - xattrUserNs = "user." - snap = ".snap" - xattrLock = xattrUserNs + "reva.lockpayload" + xattrUserNs = "user." + xattrLock = xattrUserNs + "reva.lockpayload" ) type cephfs struct { @@ -69,8 +63,8 @@ func init() { registry.Register("cephfs", New) } -// New returns an implementation to of the storage.FS interface that talks to -// a ceph filesystem. +// New returns an implementation of the storage.FS interface that talks to +// a CephFS storage via libcephfs. func New(ctx context.Context, m map[string]interface{}) (fs storage.FS, err error) { var o Options if err := cfg.Decode(m, &o); err != nil { @@ -82,19 +76,9 @@ func New(ctx context.Context, m map[string]interface{}) (fs storage.FS, err erro return nil, errors.New("cephfs: can't create caches") } - adminConn := newAdminConn(&o) - if adminConn == nil { - return nil, errors.Wrap(err, "cephfs: Couldn't create admin connections") - } - - for _, dir := range []string{o.ShadowFolder, o.UploadFolder} { - _, err := adminConn.adminMount.Statx(dir, goceph.StatxMask(goceph.StatxIno), 0) - if err != nil { - err = adminConn.adminMount.MakeDir(dir, dirPermFull) - if err != nil && err.Error() != errFileExists { - return nil, errors.New("cephfs: can't initialise system dir " + dir + ":" + err.Error()) - } - } + adminConn, err := newAdminConn(&o) + if err != nil { + return nil, errors.Wrap(err, "cephfs: couldn't create admin connections") } return &cephfs{ @@ -105,70 +89,66 @@ func New(ctx context.Context, m map[string]interface{}) (fs storage.FS, err erro } func (fs *cephfs) GetHome(ctx context.Context) (string, error) { - if fs.conf.DisableHome { - return "", errtypes.NotSupported("cephfs: GetHome() home supported disabled") - } - + log := appctx.GetLogger(ctx) user := fs.makeUser(ctx) - + log.Debug().Interface("user", user).Msg("GetHome for user") return user.home, nil } func (fs *cephfs) CreateHome(ctx context.Context) (err error) { - if fs.conf.DisableHome { - return errtypes.NotSupported("cephfs: GetHome() home supported disabled") - } + log := appctx.GetLogger(ctx) user := fs.makeUser(ctx) + log.Debug().Interface("user", user).Msg("CreateHome for user") - // Stop createhome from running the whole thing because it is called multiple times - if _, err = fs.adminConn.adminMount.Statx(user.home, goceph.StatxMode, 0); err == nil { - return + // Skip home creation if the directory already exists. + // We do not check for all necessary attributes, only for the existence of the directory. + stat, err := fs.adminConn.adminMount.Statx(user.home, goceph.StatxMode, 0) + if err != nil { + return errors.Wrap(err, "error stating user home when trying to create it") } + log.Debug().Interface("stat", stat).Msgf("home is %s") + + // TODO(labkode): for now we always try to create the home directory even if it exists. + // One needs to check for "no such of file or directory" error to short-cut. + err = walkPath(user.home, func(path string) error { return fs.adminConn.adminMount.MakeDir(path, fs.conf.DirPerms) }, false) if err != nil { - return getRevaError(err) + return getRevaError(ctx, err) } err = fs.adminConn.adminMount.Chown(user.home, uint32(user.UidNumber), uint32(user.GidNumber)) if err != nil { - return getRevaError(err) + return getRevaError(ctx, err) } err = fs.adminConn.adminMount.SetXattr(user.home, "ceph.quota.max_bytes", []byte(fmt.Sprint(fs.conf.UserQuotaBytes)), 0) if err != nil { - return getRevaError(err) + return getRevaError(ctx, err) } - user.op(func(cv *cacheVal) { - err = cv.mount.MakeDir(removeLeadingSlash(fs.conf.ShareFolder), fs.conf.DirPerms) - if err != nil && err.Error() == errFileExists { - err = nil - } - }) - - return getRevaError(err) + return nil } func (fs *cephfs) CreateDir(ctx context.Context, ref *provider.Reference) error { user := fs.makeUser(ctx) path, err := user.resolveRef(ref) if err != nil { - return getRevaError(err) + return getRevaError(ctx, err) } + log := appctx.GetLogger(ctx) user.op(func(cv *cacheVal) { if err = cv.mount.MakeDir(path, fs.conf.DirPerms); err != nil { + log.Debug().Str("path", path).Err(err).Msg("cv.mount.CreateDir returned") return } - - //TODO(tmourati): Add entry id logic }) - return getRevaError(err) + return getRevaError(ctx, err) } func (fs *cephfs) Delete(ctx context.Context, ref *provider.Reference) (err error) { @@ -179,20 +159,22 @@ func (fs *cephfs) Delete(ctx context.Context, ref *provider.Reference) (err erro return err } + log := appctx.GetLogger(ctx) user.op(func(cv *cacheVal) { if err = cv.mount.Unlink(path); err != nil && err.Error() == errIsADirectory { err = cv.mount.RemoveDir(path) } - - //TODO(tmourati): Add entry id logic }) - //has already been deleted by direct mount - if err != nil && err.Error() == errNotFound { - return nil + if err != nil { + log.Debug().Any("ref", ref).Err(err).Msg("Delete returned") + if err.Error() == errNotFound { + //has already been deleted by direct mount + return nil + } } - return getRevaError(err) + return getRevaError(ctx, err) } func (fs *cephfs) Move(ctx context.Context, oldRef, newRef *provider.Reference) (err error) { @@ -205,12 +187,12 @@ func (fs *cephfs) Move(ctx context.Context, oldRef, newRef *provider.Reference) return } + log := appctx.GetLogger(ctx) user.op(func(cv *cacheVal) { if err = cv.mount.Rename(oldPath, newPath); err != nil { + log.Debug().Any("oldRef", oldRef).Any("newRef", newRef).Err(err).Msg("cv.mount.Rename returned") return } - - //TODO(tmourati): Add entry id logic, handle already moved file error }) // has already been moved by direct mount @@ -218,13 +200,17 @@ func (fs *cephfs) Move(ctx context.Context, oldRef, newRef *provider.Reference) return nil } - return getRevaError(err) + return getRevaError(ctx, err) } func (fs *cephfs) GetMD(ctx context.Context, ref *provider.Reference, mdKeys []string) (ri *provider.ResourceInfo, err error) { + if ref == nil { + return nil, errors.New("error: ref is nil") + } + + log := appctx.GetLogger(ctx) var path string user := fs.makeUser(ctx) - if path, err = user.resolveRef(ref); err != nil { return nil, err } @@ -232,37 +218,43 @@ func (fs *cephfs) GetMD(ctx context.Context, ref *provider.Reference, mdKeys []s user.op(func(cv *cacheVal) { var stat Statx if stat, err = cv.mount.Statx(path, goceph.StatxBasicStats, 0); err != nil { + log.Debug().Str("path", path).Err(err).Msg("cv.mount.Statx returned") return } ri, err = user.fileAsResourceInfo(cv, path, stat, mdKeys) + if err != nil { + log.Debug().Any("resourceInfo", ri).Err(err).Msg("fileAsResourceInfo returned") + } }) - return ri, getRevaError(err) + return ri, getRevaError(ctx, err) } func (fs *cephfs) ListFolder(ctx context.Context, ref *provider.Reference, mdKeys []string) (files []*provider.ResourceInfo, err error) { - var path string - user := fs.makeUser(ctx) - if path, err = user.resolveRef(ref); err != nil { - return + if ref == nil { + return nil, errors.New("error: ref is nil") } - // The user wants to access their home, create it if it doesn't exist - if path == fs.conf.Root { - if err = fs.CreateHome(ctx); err != nil { - return - } + log := appctx.GetLogger(ctx) + log.Debug().Interface("ref", ref) + user := fs.makeUser(ctx) + + var path string + if path, err = user.resolveRef(ref); err != nil { + return nil, err } user.op(func(cv *cacheVal) { var dir *goceph.Directory if dir, err = cv.mount.OpenDir(path); err != nil { + log.Debug().Str("path", path).Err(err).Msg("cv.mount.OpenDir returned") return } defer closeDir(dir) var entry *goceph.DirEntryPlus var ri *provider.ResourceInfo + for entry, err = dir.ReadDirPlus(goceph.StatxBasicStats, 0); entry != nil && err == nil; entry, err = dir.ReadDirPlus(goceph.StatxBasicStats, 0) { if fs.conf.HiddenDirs[entry.Name()] { continue @@ -271,8 +263,7 @@ func (fs *cephfs) ListFolder(ctx context.Context, ref *provider.Reference, mdKey ri, err = user.fileAsResourceInfo(cv, filepath.Join(path, entry.Name()), entry.Statx(), mdKeys) if ri == nil || err != nil { if err != nil { - log := appctx.GetLogger(ctx) - log.Err(err).Msg("cephfs: error in file as resource info") + log.Debug().Any("resourceInfo", ri).Err(err).Msg("fileAsResourceInfo returned") } err = nil continue @@ -282,7 +273,7 @@ func (fs *cephfs) ListFolder(ctx context.Context, ref *provider.Reference, mdKey } }) - return files, getRevaError(err) + return files, getRevaError(ctx, err) } func (fs *cephfs) Download(ctx context.Context, ref *provider.Reference) (rc io.ReadCloser, err error) { @@ -292,116 +283,31 @@ func (fs *cephfs) Download(ctx context.Context, ref *provider.Reference) (rc io. return nil, errors.Wrap(err, "cephfs: error resolving ref") } + log := appctx.GetLogger(ctx) user.op(func(cv *cacheVal) { - if strings.HasPrefix(strings.TrimPrefix(path, user.home), fs.conf.ShareFolder) { - err = errtypes.PermissionDenied("cephfs: cannot download under the virtual share folder") + if rc, err = cv.mount.Open(path, os.O_RDONLY, 0); err != nil { + log.Debug().Any("ref", ref).Err(err).Msg("cv.mount.Open returned") return } - rc, err = cv.mount.Open(path, os.O_RDONLY, 0) }) - return rc, getRevaError(err) + return rc, getRevaError(ctx, err) } func (fs *cephfs) ListRevisions(ctx context.Context, ref *provider.Reference) (fvs []*provider.FileVersion, err error) { - //TODO(tmourati): Fix entry id logic - var path string - user := fs.makeUser(ctx) - if path, err = user.resolveRef(ref); err != nil { - return nil, errors.Wrap(err, "cephfs: error resolving ref") - } - - user.op(func(cv *cacheVal) { - if strings.HasPrefix(path, removeLeadingSlash(fs.conf.ShareFolder)) { - err = errtypes.PermissionDenied("cephfs: cannot download under the virtual share folder") - return - } - var dir *goceph.Directory - if dir, err = cv.mount.OpenDir(".snap"); err != nil { - return - } - defer closeDir(dir) - - for d, _ := dir.ReadDir(); d != nil; d, _ = dir.ReadDir() { - var revPath string - var stat Statx - var e error - - if strings.HasPrefix(d.Name(), ".") { - continue - } - - revPath, e = resolveRevRef(cv.mount, ref, d.Name()) - if e != nil { - continue - } - stat, e = cv.mount.Statx(revPath, goceph.StatxMtime|goceph.StatxSize, 0) - if e != nil { - continue - } - fvs = append(fvs, &provider.FileVersion{ - Key: d.Name(), - Size: stat.Size, - Mtime: uint64(stat.Mtime.Sec), - }) - } - }) - - return fvs, getRevaError(err) + return nil, errtypes.NotSupported("cephfs: RestoreRevision not supported") } func (fs *cephfs) DownloadRevision(ctx context.Context, ref *provider.Reference, key string) (file io.ReadCloser, err error) { - //TODO(tmourati): Fix entry id logic - user := fs.makeUser(ctx) - - user.op(func(cv *cacheVal) { - var revPath string - revPath, err = resolveRevRef(cv.mount, ref, key) - if err != nil { - return - } - - file, err = cv.mount.Open(revPath, os.O_RDONLY, 0) - }) - - return file, getRevaError(err) + return nil, errtypes.NotSupported("cephfs: RestoreRevision not supported") } func (fs *cephfs) RestoreRevision(ctx context.Context, ref *provider.Reference, key string) (err error) { - //TODO(tmourati): Fix entry id logic - var path string - user := fs.makeUser(ctx) - if path, err = user.resolveRef(ref); err != nil { - return errors.Wrap(err, "cephfs: error resolving ref") - } - - user.op(func(cv *cacheVal) { - var revPath string - if revPath, err = resolveRevRef(cv.mount, ref, key); err != nil { - err = errors.Wrap(err, "cephfs: error resolving revision ref "+ref.String()) - return - } - - var src, dst *goceph.File - if src, err = cv.mount.Open(revPath, os.O_RDONLY, 0); err != nil { - return - } - defer closeFile(src) - - if dst, err = cv.mount.Open(path, os.O_WRONLY|os.O_TRUNC, 0); err != nil { - return - } - defer closeFile(dst) - - _, err = io.Copy(dst, src) - }) - - return getRevaError(err) + return errtypes.NotSupported("cephfs: RestoreRevision not supported") } func (fs *cephfs) GetPathByID(ctx context.Context, id *provider.ResourceId) (str string, err error) { - //TODO(tmourati): Add entry id logic - return "", errtypes.NotSupported("cephfs: entry IDs currently not supported") + return "", errtypes.NotSupported("cephfs: ids currently not supported") } func (fs *cephfs) AddGrant(ctx context.Context, ref *provider.Reference, g *provider.Grant) (err error) { @@ -411,11 +317,14 @@ func (fs *cephfs) AddGrant(ctx context.Context, ref *provider.Reference, g *prov return } + log := appctx.GetLogger(ctx) user.op(func(cv *cacheVal) { - err = fs.changePerms(ctx, cv.mount, g, path, updateGrant) + if err = fs.changePerms(ctx, cv.mount, g, path, updateGrant); err != nil { + log.Debug().Any("ref", ref).Any("grant", g).Err(err).Msg("AddGrant returned") + } }) - return getRevaError(err) + return getRevaError(ctx, err) } func (fs *cephfs) RemoveGrant(ctx context.Context, ref *provider.Reference, g *provider.Grant) (err error) { @@ -425,11 +334,14 @@ func (fs *cephfs) RemoveGrant(ctx context.Context, ref *provider.Reference, g *p return } + log := appctx.GetLogger(ctx) user.op(func(cv *cacheVal) { - err = fs.changePerms(ctx, cv.mount, g, path, removeGrant) + if err = fs.changePerms(ctx, cv.mount, g, path, removeGrant); err != nil { + log.Debug().Any("ref", ref).Any("grant", g).Err(err).Msg("RemoveGrant returned") + } }) - return getRevaError(err) + return getRevaError(ctx, err) } func (fs *cephfs) UpdateGrant(ctx context.Context, ref *provider.Reference, g *provider.Grant) (err error) { @@ -439,11 +351,14 @@ func (fs *cephfs) UpdateGrant(ctx context.Context, ref *provider.Reference, g *p return } + log := appctx.GetLogger(ctx) user.op(func(cv *cacheVal) { - err = fs.changePerms(ctx, cv.mount, g, path, updateGrant) + if err = fs.changePerms(ctx, cv.mount, g, path, updateGrant); err != nil { + log.Debug().Any("ref", ref).Any("grant", g).Err(err).Msg("UpdateGrant returned") + } }) - return getRevaError(err) + return getRevaError(ctx, err) } func (fs *cephfs) DenyGrant(ctx context.Context, ref *provider.Reference, g *provider.Grantee) (err error) { @@ -453,12 +368,15 @@ func (fs *cephfs) DenyGrant(ctx context.Context, ref *provider.Reference, g *pro return } + log := appctx.GetLogger(ctx) user.op(func(cv *cacheVal) { grant := &provider.Grant{Grantee: g} //nil perms will remove the whole grant - err = fs.changePerms(ctx, cv.mount, grant, path, removeGrant) + if err = fs.changePerms(ctx, cv.mount, grant, path, removeGrant); err != nil { + log.Debug().Any("ref", ref).Any("grant", grant).Err(err).Msg("DenyGrant returned") + } }) - return getRevaError(err) + return getRevaError(ctx, err) } func (fs *cephfs) ListGrants(ctx context.Context, ref *provider.Reference) (glist []*provider.Grant, err error) { @@ -476,7 +394,7 @@ func (fs *cephfs) ListGrants(ctx context.Context, ref *provider.Reference) (glis } }) - return glist, getRevaError(err) + return glist, getRevaError(ctx, err) } func (fs *cephfs) GetQuota(ctx context.Context, ref *provider.Reference) (total uint64, used uint64, err error) { @@ -499,28 +417,11 @@ func (fs *cephfs) GetQuota(ctx context.Context, ref *provider.Reference) (total } }) - return total, used, getRevaError(err) + return total, used, getRevaError(ctx, err) } func (fs *cephfs) CreateReference(ctx context.Context, path string, targetURI *url.URL) (err error) { - user := fs.makeUser(ctx) - - user.op(func(cv *cacheVal) { - if !strings.HasPrefix(strings.TrimPrefix(path, user.home), fs.conf.ShareFolder) { - err = errors.New("cephfs: can't create reference outside a share folder") - } else { - err = cv.mount.MakeDir(path, fs.conf.DirPerms) - } - }) - if err != nil { - return getRevaError(err) - } - - user.op(func(cv *cacheVal) { - err = cv.mount.SetXattr(path, xattrRef, []byte(targetURI.String()), 0) - }) - - return getRevaError(err) + return errors.New("error: CreateReference not implemented") } func (fs *cephfs) Shutdown(ctx context.Context) (err error) { @@ -540,6 +441,7 @@ func (fs *cephfs) SetArbitraryMetadata(ctx context.Context, ref *provider.Refere return err } + log := appctx.GetLogger(ctx) user.op(func(cv *cacheVal) { for k, v := range md.Metadata { if !strings.HasPrefix(k, xattrUserNs) { @@ -547,12 +449,13 @@ func (fs *cephfs) SetArbitraryMetadata(ctx context.Context, ref *provider.Refere } if e := cv.mount.SetXattr(path, k, []byte(v), 0); e != nil { err = errors.Wrap(err, e.Error()) + log.Debug().Any("ref", ref).Str("key", k).Any("v", v).Err(err).Msg("SetXattr returned") return } } }) - return getRevaError(err) + return getRevaError(ctx, err) } func (fs *cephfs) UnsetArbitraryMetadata(ctx context.Context, ref *provider.Reference, keys []string) (err error) { @@ -562,6 +465,7 @@ func (fs *cephfs) UnsetArbitraryMetadata(ctx context.Context, ref *provider.Refe return err } + log := appctx.GetLogger(ctx) user.op(func(cv *cacheVal) { for _, key := range keys { if !strings.HasPrefix(key, xattrUserNs) { @@ -569,32 +473,33 @@ func (fs *cephfs) UnsetArbitraryMetadata(ctx context.Context, ref *provider.Refe } if e := cv.mount.RemoveXattr(path, key); e != nil { err = errors.Wrap(err, e.Error()) + log.Debug().Any("ref", ref).Str("key", key).Err(err).Msg("RemoveXattr returned") return } } }) - return getRevaError(err) + return getRevaError(ctx, err) } func (fs *cephfs) TouchFile(ctx context.Context, ref *provider.Reference) error { user := fs.makeUser(ctx) path, err := user.resolveRef(ref) if err != nil { - return getRevaError(err) + return getRevaError(ctx, err) } + log := appctx.GetLogger(ctx) user.op(func(cv *cacheVal) { var file *goceph.File defer closeFile(file) if file, err = cv.mount.Open(path, os.O_CREATE|os.O_WRONLY, fs.conf.FilePerms); err != nil { + log.Debug().Any("ref", ref).Err(err).Msg("Touch: Open returned") return } - - //TODO(tmourati): Add entry id logic }) - return getRevaError(err) + return getRevaError(ctx, err) } func (fs *cephfs) EmptyRecycle(ctx context.Context) error { @@ -656,7 +561,7 @@ func (fs *cephfs) SetLock(ctx context.Context, ref *provider.Reference, lock *pr user := fs.makeUser(ctx) path, err := user.resolveRef(ref) if err != nil { - return getRevaError(err) + return getRevaError(ctx, err) } op := goceph.LockEX @@ -684,14 +589,14 @@ func (fs *cephfs) SetLock(ctx context.Context, ref *provider.Reference, lock *pr return fs.SetArbitraryMetadata(ctx, ref, md) } - return getRevaError(err) + return getRevaError(ctx, err) } func (fs *cephfs) GetLock(ctx context.Context, ref *provider.Reference) (*provider.Lock, error) { user := fs.makeUser(ctx) path, err := user.resolveRef(ref) if err != nil { - return nil, getRevaError(err) + return nil, getRevaError(ctx, err) } var l *provider.Lock @@ -744,7 +649,7 @@ func (fs *cephfs) GetLock(ctx context.Context, ref *provider.Reference) (*provid return }) - return l, getRevaError(err) + return l, getRevaError(ctx, err) } // TODO(lopresti) part of this logic is duplicated from eosfs.go, should be factored out @@ -787,7 +692,7 @@ func (fs *cephfs) Unlock(ctx context.Context, ref *provider.Reference, lock *pro user := fs.makeUser(ctx) path, err := user.resolveRef(ref) if err != nil { - return getRevaError(err) + return getRevaError(ctx, err) } oldLock, err := fs.GetLock(ctx, ref) @@ -824,5 +729,5 @@ func (fs *cephfs) Unlock(ctx context.Context, ref *provider.Reference, lock *pro return fs.UnsetArbitraryMetadata(ctx, ref, []string{xattrLock}) } - return getRevaError(err) + return getRevaError(ctx, err) } diff --git a/pkg/storage/fs/cephfs/chunking.go b/pkg/storage/fs/cephfs/chunking.go index a89fa23824..1b5be2b4b0 100644 --- a/pkg/storage/fs/cephfs/chunking.go +++ b/pkg/storage/fs/cephfs/chunking.go @@ -26,6 +26,7 @@ import ( "fmt" "io" "os" + "path" "path/filepath" "regexp" "strconv" @@ -43,11 +44,12 @@ func IsChunked(fn string) (bool, error) { } // ChunkBLOBInfo stores info about a particular chunk +// example: given /users/peter/myfile.txt-chunking-1234-10-2 type ChunkBLOBInfo struct { - Path string - TransferID string - TotalChunks int - CurrentChunk int + Path string // example: /users/peter/myfile.txt + TransferID string // example: 1234 + TotalChunks int // example: 10 + CurrentChunk int // example: 2 } // Not using the resource path in the chunk folder name allows uploading to @@ -85,21 +87,22 @@ func GetChunkBLOBInfo(path string) (*ChunkBLOBInfo, error) { // ChunkHandler manages chunked uploads, storing the chunks in a temporary directory // until it gets the final chunk which is then returned. type ChunkHandler struct { - user *User - chunkFolder string + user *User + uploadFolder string // example: /users/peter/.uploads } // NewChunkHandler creates a handler for chunked uploads. func NewChunkHandler(ctx context.Context, fs *cephfs) *ChunkHandler { - return &ChunkHandler{fs.makeUser(ctx), fs.conf.UploadFolder} + u := fs.makeUser(ctx) + return &ChunkHandler{u, path.Join(u.home, fs.conf.UploadFolder)} } -func (c *ChunkHandler) getChunkTempFileName() string { +func (c *ChunkHandler) getTempFileName() string { return fmt.Sprintf("__%d_%s", time.Now().Unix(), uuid.New().String()) } -func (c *ChunkHandler) getChunkFolderName(i *ChunkBLOBInfo) (path string, err error) { - path = filepath.Join(c.chunkFolder, i.uploadID()) +func (c *ChunkHandler) getAndCreateTransferFolderName(i *ChunkBLOBInfo) (path string, err error) { + path = filepath.Join(c.uploadFolder, i.uploadID()) c.user.op(func(cv *cacheVal) { err = cv.mount.MakeDir(path, 0777) }) @@ -107,6 +110,7 @@ func (c *ChunkHandler) getChunkFolderName(i *ChunkBLOBInfo) (path string, err er return } +// TODO(labkode): I don't like how this function looks like, better to refactor func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (finish bool, chunk string, err error) { chunkInfo, err := GetChunkBLOBInfo(path) if err != nil { @@ -114,10 +118,21 @@ func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (finish bool, chu return } - chunkTempFilename := c.getChunkTempFileName() + transferFolderName, err := c.getAndCreateTransferFolderName(chunkInfo) + if err != nil { + // TODO(labkode): skip error for now + // err = fmt.Errorf("error getting transfer folder anme", err) + return + } + + // here we write a temporary file that will be renamed to the transfer folder + // with the correct sequence number filename. + // we do not store this before-rename temporary files inside the transfer folder + // to avoid errors when counting the number of chunks for finalizing the transfer. + tmpFilename := c.getTempFileName() c.user.op(func(cv *cacheVal) { var tmpFile *goceph.File - target := filepath.Join(c.chunkFolder, chunkTempFilename) + target := filepath.Join(c.uploadFolder, tmpFilename) tmpFile, err = cv.mount.Open(target, os.O_CREATE|os.O_WRONLY, c.user.fs.conf.FilePerms) defer closeFile(tmpFile) if err != nil { @@ -129,15 +144,9 @@ func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (finish bool, chu return } - chunksFolderName, err := c.getChunkFolderName(chunkInfo) - if err != nil { - return - } - // c.logger.Info().Log("chunkfolder", chunksFolderName) - - chunkTarget := filepath.Join(chunksFolderName, strconv.Itoa(chunkInfo.CurrentChunk)) + chunkTarget := filepath.Join(transferFolderName, strconv.Itoa(chunkInfo.CurrentChunk)) c.user.op(func(cv *cacheVal) { - err = cv.mount.Rename(chunkTempFilename, chunkTarget) + err = cv.mount.Rename(tmpFilename, chunkTarget) }) if err != nil { return @@ -154,7 +163,7 @@ func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (finish bool, chu var entry *goceph.DirEntry var chunkFile, assembledFile *goceph.File - dir, err = cv.mount.OpenDir(chunksFolderName) + dir, err = cv.mount.OpenDir(transferFolderName) defer closeDir(dir) for entry, err = dir.ReadDir(); entry != nil && err == nil; entry, err = dir.ReadDir() { @@ -167,16 +176,20 @@ func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (finish bool, chu return } - chunk = filepath.Join(c.chunkFolder, c.getChunkTempFileName()) - assembledFile, err = cv.mount.Open(chunk, os.O_CREATE|os.O_WRONLY, c.user.fs.conf.FilePerms) + // from now on we do have all the necessary chunks, + // so we create a temporary file where all the chunks will be written + // before being renamed to the requested location, from the example: /users/peter/myfile.txt + + assemblyFilename := filepath.Join(c.uploadFolder, c.getTempFileName()) + assembledFile, err = cv.mount.Open(assemblyFilename, os.O_CREATE|os.O_WRONLY, c.user.fs.conf.FilePerms) defer closeFile(assembledFile) - defer deleteFile(cv.mount, chunk) + defer deleteFile(cv.mount, assemblyFilename) if err != nil { return } for i := 0; i < numEntries; i++ { - target := filepath.Join(chunksFolderName, strconv.Itoa(i)) + target := filepath.Join(transferFolderName, strconv.Itoa(i)) chunkFile, err = cv.mount.Open(target, os.O_RDONLY, 0) if err != nil { @@ -189,22 +202,22 @@ func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (finish bool, chu } } - // necessary approach in case assembly fails + // clean all the chunks that made the assembly file for i := 0; i < numEntries; i++ { - target := filepath.Join(chunksFolderName, strconv.Itoa(i)) + target := filepath.Join(transferFolderName, strconv.Itoa(i)) err = cv.mount.Unlink(target) if err != nil { return } } - _ = cv.mount.Unlink(chunksFolderName) }) - - return true, chunk, nil + return } // WriteChunk saves an intermediate chunk temporarily and assembles all chunks // once the final one is received. +// this function will return the original filename (myfile.txt) and the assemblyPath when +// the upload is completed func (c *ChunkHandler) WriteChunk(fn string, r io.ReadCloser) (string, string, error) { finish, chunk, err := c.saveChunk(fn, r) if err != nil { diff --git a/pkg/storage/fs/cephfs/connections.go b/pkg/storage/fs/cephfs/connections.go index 916c6b9a93..d18a288282 100644 --- a/pkg/storage/fs/cephfs/connections.go +++ b/pkg/storage/fs/cephfs/connections.go @@ -116,21 +116,21 @@ type adminConn struct { // radosIO *rados2.IOContext } -func newAdminConn(conf *Options) *adminConn { +func newAdminConn(conf *Options) (*adminConn, error) { rados, err := rados2.NewConnWithUser(conf.ClientID) if err != nil { - return nil + return nil, errors.Wrap(err, "error creating connection with user for client id: "+conf.ClientID) } if err = rados.ReadConfigFile(conf.Config); err != nil { - return nil + return nil, errors.Wrapf(err, "error reading config file %s", conf.Config) } if err = rados.SetConfigOption("keyring", conf.Keyring); err != nil { - return nil + return nil, errors.Wrapf(err, "error setting keyring conf: %s", conf.Keyring) } if err = rados.Connect(); err != nil { - return nil + return nil, errors.Wrap(err, "error connecting to rados") } // TODO: May use later for file ids @@ -166,13 +166,13 @@ func newAdminConn(conf *Options) *adminConn { mount, err := goceph.CreateFromRados(rados) if err != nil { rados.Shutdown() - return nil + return nil, errors.Wrap(err, "error calling CreateFromRados") } if err = mount.MountWithRoot(conf.Root); err != nil { rados.Shutdown() destroyCephConn(mount, nil) - return nil + return nil, errors.Wrapf(err, "error mounting with root %s", conf.Root) } return &adminConn{ @@ -181,7 +181,7 @@ func newAdminConn(conf *Options) *adminConn { mount, rados, // radosIO, - } + }, nil } func newConn(user *User) *cacheVal { @@ -203,6 +203,7 @@ func newConn(user *User) *cacheVal { } if user != nil { //nil creates admin conn + // TODO(lopresti) here we may need to impersonate a different user in order to support ACLs! perm = goceph.NewUserPerm(int(user.UidNumber), int(user.GidNumber), []int{}) if err = mount.SetMountPerms(perm); err != nil { return destroyCephConn(mount, perm) @@ -213,11 +214,14 @@ func newConn(user *User) *cacheVal { return destroyCephConn(mount, perm) } - if user != nil && !user.fs.conf.DisableHome { - if err = mount.ChangeDir(user.fs.conf.Root); err != nil { - return destroyCephConn(mount, perm) + // TODO(labkode): we leave the mount on the fs root + /* + if user != nil && !user.fs.conf.DisableHome { + if err = mount.ChangeDir(user.fs.conf.Root); err != nil { + return destroyCephConn(mount, perm) + } } - } + */ return &cacheVal{ perm: perm, diff --git a/pkg/storage/fs/cephfs/errors.go b/pkg/storage/fs/cephfs/errors.go index d866e6d837..8182635bea 100644 --- a/pkg/storage/fs/cephfs/errors.go +++ b/pkg/storage/fs/cephfs/errors.go @@ -29,7 +29,8 @@ package cephfs import "C" import ( "fmt" - + "context" + "github.com/cs3org/reva/pkg/appctx" "github.com/cs3org/reva/pkg/errtypes" ) @@ -45,10 +46,12 @@ var ( errPermissionDenied = wrapErrorMsg(C.EACCES) ) -func getRevaError(err error) error { +func getRevaError(ctx context.Context, err error) error { if err == nil { return nil } + log := appctx.GetLogger(ctx) + log.Warn().Err(err).Msg("cephfs error") switch err.Error() { case errNotFound: return errtypes.NotFound("cephfs: entry not found") diff --git a/pkg/storage/fs/cephfs/options.go b/pkg/storage/fs/cephfs/options.go index c33e76eb89..a7b6149462 100644 --- a/pkg/storage/fs/cephfs/options.go +++ b/pkg/storage/fs/cephfs/options.go @@ -22,25 +22,19 @@ package cephfs import ( - "path/filepath" - "github.com/cs3org/reva/pkg/sharedconf" ) // Options for the cephfs module type Options struct { - ClientID string `mapstructure:"client_id"` - Config string `mapstructure:"config"` - GatewaySvc string `mapstructure:"gatewaysvc"` - IndexPool string `mapstructure:"index_pool"` - Keyring string `mapstructure:"keyring"` - Root string `mapstructure:"root"` - ShadowFolder string `mapstructure:"shadow_folder"` - ShareFolder string `mapstructure:"share_folder"` - UploadFolder string `mapstructure:"uploads"` - UserLayout string `mapstructure:"user_layout"` - - DisableHome bool `mapstructure:"disable_home"` + ClientID string `mapstructure:"client_id"` + Config string `mapstructure:"config"` + GatewaySvc string `mapstructure:"gatewaysvc"` + IndexPool string `mapstructure:"index_pool"` + Keyring string `mapstructure:"keyring"` + Root string `mapstructure:"root"` + UploadFolder string `mapstructure:"uploads"` + UserLayout string `mapstructure:"user_layout"` DirPerms uint32 `mapstructure:"dir_perms"` FilePerms uint32 `mapstructure:"file_perms"` UserQuotaBytes uint64 `mapstructure:"user_quota_bytes"` @@ -71,27 +65,14 @@ func (c *Options) ApplyDefaults() { } if c.Root == "" { - c.Root = "/home" + c.Root = "/cephfs" } else { c.Root = addLeadingSlash(c.Root) } - if c.ShadowFolder == "" { - c.ShadowFolder = "/.reva_hidden" - } else { - c.ShadowFolder = addLeadingSlash(c.ShadowFolder) - } - - if c.ShareFolder == "" { - c.ShareFolder = "/Shares" - } else { - c.ShareFolder = addLeadingSlash(c.ShareFolder) - } - if c.UploadFolder == "" { c.UploadFolder = ".uploads" } - c.UploadFolder = filepath.Join(c.ShadowFolder, c.UploadFolder) if c.UserLayout == "" { c.UserLayout = "{{.Username}}" @@ -100,7 +81,7 @@ func (c *Options) ApplyDefaults() { c.HiddenDirs = map[string]bool{ ".": true, "..": true, - removeLeadingSlash(c.ShadowFolder): true, + removeLeadingSlash(c.UploadFolder): true, } if c.DirPerms == 0 { diff --git a/pkg/storage/fs/cephfs/upload.go b/pkg/storage/fs/cephfs/upload.go index 64ac7f7179..44639881a7 100644 --- a/pkg/storage/fs/cephfs/upload.go +++ b/pkg/storage/fs/cephfs/upload.go @@ -39,46 +39,52 @@ func (fs *cephfs) Upload(ctx context.Context, ref *provider.Reference, r io.Read ok, err := IsChunked(p) if err != nil { - return errors.Wrap(err, "cephfs: error checking path") + return errors.Wrap(err, "cephfs: error checking if path is chunked") } - if ok { - var assembledFile string - p, assembledFile, err = NewChunkHandler(ctx, fs).WriteChunk(p, r) - if err != nil { - return err - } - if p == "" { - return errtypes.PartialContent(ref.String()) - } + + if !ok { + var file io.WriteCloser user.op(func(cv *cacheVal) { - r, err = cv.mount.Open(assembledFile, os.O_RDONLY, 0) - }) - if err != nil { - return errors.Wrap(err, "cephfs: error opening assembled file") - } - defer r.Close() - defer user.op(func(cv *cacheVal) { - _ = cv.mount.Unlink(assembledFile) + file, err = cv.mount.Open(p, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, fs.conf.FilePerms) + if err != nil { + err = errors.Wrap(err, "cephfs: error opening binary file") + return + } + defer file.Close() + + _, err = io.Copy(file, r) + if err != nil { + err = errors.Wrap(err, "cephfs: error writing to binary file") + return + } }) + + return nil } - var file io.WriteCloser + // upload is chunked + + var assembledFile string + + // iniate the chunk handler + originalFilename, assembledFile, err := NewChunkHandler(ctx, fs).WriteChunk(p, r) + if err != nil { + return errors.Wrapf(err, "error writing chunk %v %v %v", p, r, assembledFile) + } + if originalFilename == "" { // means we wrote a chunk only + return errtypes.PartialContent(ref.String()) + } user.op(func(cv *cacheVal) { - file, err = cv.mount.Open(p, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, fs.conf.FilePerms) - if err != nil { - err = errors.Wrap(err, "cephfs: error opening binary file") - return - } - defer file.Close() - - _, err = io.Copy(file, r) - if err != nil { - err = errors.Wrap(err, "cephfs: error writing to binary file") - return - } + err = cv.mount.Rename(assembledFile, originalFilename) + }) + if err != nil { + return errors.Wrap(err, "cephfs: error renaming assembled file") + } + defer user.op(func(cv *cacheVal) { + _ = cv.mount.Unlink(assembledFile) }) + return nil - return err } func (fs *cephfs) InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64, metadata map[string]string) (map[string]string, error) { diff --git a/pkg/storage/fs/cephfs/user.go b/pkg/storage/fs/cephfs/user.go index da15b1ec45..a2d3cdeb97 100644 --- a/pkg/storage/fs/cephfs/user.go +++ b/pkg/storage/fs/cephfs/user.go @@ -53,11 +53,8 @@ type User struct { func (fs *cephfs) makeUser(ctx context.Context) *User { u := appctx.ContextMustGetUser(ctx) - home := fs.conf.Root - if !fs.conf.DisableHome { - home = filepath.Join(fs.conf.Root, templates.WithUser(u, fs.conf.UserLayout)) - } - + // home := fs.conf.Root + home := filepath.Join(fs.conf.Root, templates.WithUser(u, fs.conf.UserLayout)) return &User{u, fs, ctx, home} } @@ -139,8 +136,6 @@ func (user *User) fileAsResourceInfo(cv *cacheVal, path string, stat *goceph.Cep } } - //TODO(tmourati): Add entry id logic here - var etag string if isDir(_type) { rctime, _ := cv.mount.GetXattr(path, "ceph.dir.rctime") @@ -162,37 +157,11 @@ func (user *User) fileAsResourceInfo(cv *cacheVal, path string, stat *goceph.Cep } } + // cephfs does not provide checksums, so we cannot set it + // a 3rd party tool can add a checksum attribute and we can read it, + // if ever that is implemented. var checksum provider.ResourceChecksum - var md5 string - if _type == provider.ResourceType_RESOURCE_TYPE_FILE { - md5tsBA, err := cv.mount.GetXattr(path, xattrMd5ts) //local error inside if scope - if err == nil { - md5ts, _ := strconv.ParseInt(string(md5tsBA), 10, 64) - if stat.Mtime.Sec == md5ts { - md5BA, err := cv.mount.GetXattr(path, xattrMd5) - if err != nil { - md5, err = calcChecksum(path, cv.mount, stat) - } else { - md5 = string(md5BA) - } - } else { - md5, err = calcChecksum(path, cv.mount, stat) - } - } else { - md5, err = calcChecksum(path, cv.mount, stat) - } - - if err != nil && err.Error() == errPermissionDenied { - checksum.Type = provider.ResourceChecksumType_RESOURCE_CHECKSUM_TYPE_UNSET - } else if err != nil { - return nil, errors.New("cephfs: error calculating checksum of file") - } else { - checksum.Type = provider.ResourceChecksumType_RESOURCE_CHECKSUM_TYPE_MD5 - checksum.Sum = md5 - } - } else { - checksum.Type = provider.ResourceChecksumType_RESOURCE_CHECKSUM_TYPE_UNSET - } + checksum.Type = provider.ResourceChecksumType_RESOURCE_CHECKSUM_TYPE_UNSET var ownerID *userv1beta1.UserId if stat.Uid != 0 { @@ -230,13 +199,13 @@ func (user *User) fileAsResourceInfo(cv *cacheVal, path string, stat *goceph.Cep return } -func (user *User) resolveRef(ref *provider.Reference) (str string, err error) { +func (user *User) resolveRef(ref *provider.Reference) (string, error) { if ref == nil { - return "", fmt.Errorf("cephfs: nil reference") + return "", fmt.Errorf("cephfs: nil reference provided") } - if str = ref.GetPath(); str == "" { - return "", errtypes.NotSupported("cephfs: entry IDs not currently supported") + if ref.GetPath() == "" { + return "", errtypes.NotSupported("cephfs: path not provided, id based refs are not supported") } - return + return ref.GetPath(), nil } diff --git a/pkg/storage/fs/cephfs/utils.go b/pkg/storage/fs/cephfs/utils.go index 8649e5323b..fa868ec895 100644 --- a/pkg/storage/fs/cephfs/utils.go +++ b/pkg/storage/fs/cephfs/utils.go @@ -22,13 +22,7 @@ package cephfs import ( - "crypto/md5" - "encoding/hex" - "fmt" - "io" - "os" "path/filepath" - "strconv" goceph "github.com/ceph/go-ceph/cephfs" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" @@ -74,75 +68,6 @@ func isDir(t provider.ResourceType) bool { return t == provider.ResourceType_RESOURCE_TYPE_CONTAINER } -// TODO: Use when fileids are available -/* -func (fs *cephfs) makeFIDPath(fid string) string { - return "" // filepath.Join(fs.conf.EIDFolder, fid) EIDFolder does not exist -} - -func (fs *cephfs) makeFID(absolutePath string, inode string) (rid *provider.ResourceId, err error) { - sum := md5.New() - sum.Write([]byte(absolutePath)) - fid := fmt.Sprintf("%s-%s", hex.EncodeToString(sum.Sum(nil)), inode) - rid = &provider.ResourceId{OpaqueId: fid} - - _ = fs.adminConn.adminMount.Link(absolutePath, fs.makeFIDPath(fid)) - _ = fs.adminConn.adminMount.SetXattr(absolutePath, xattrEID, []byte(fid), 0) - - return -} - -func (fs *cephfs) getFIDPath(cv *cacheVal, path string) (fid string, err error) { - var buffer []byte - if buffer, err = cv.mount.GetXattr(path, xattrEID); err != nil { - return - } - - return fs.makeFIDPath(string(buffer)), err -} -*/ - -func calcChecksum(filepath string, mt Mount, stat Statx) (checksum string, err error) { - file, err := mt.Open(filepath, os.O_RDONLY, 0) - defer closeFile(file) - if err != nil { - return - } - hash := md5.New() - if _, err = io.Copy(hash, file); err != nil { - return - } - checksum = hex.EncodeToString(hash.Sum(nil)) - // we don't care if they fail, the checksum will just be recalculated if an error happens - _ = mt.SetXattr(filepath, xattrMd5ts, []byte(strconv.FormatInt(stat.Mtime.Sec, 10)), 0) - _ = mt.SetXattr(filepath, xattrMd5, []byte(checksum), 0) - - return -} - -func resolveRevRef(mt Mount, ref *provider.Reference, revKey string) (str string, err error) { - var buf []byte - if ref.GetResourceId() != nil { - str, err = mt.Readlink(filepath.Join(snap, revKey, ref.ResourceId.OpaqueId)) - if err != nil { - return "", fmt.Errorf("cephfs: invalid reference %+v", ref) - } - } else if str = ref.GetPath(); str != "" { - buf, err = mt.GetXattr(str, xattrEID) - if err != nil { - return - } - str, err = mt.Readlink(filepath.Join(snap, revKey, string(buf))) - if err != nil { - return - } - } else { - return "", fmt.Errorf("cephfs: empty reference %+v", ref) - } - - return filepath.Join(snap, revKey, str), err -} - func removeLeadingSlash(path string) string { return filepath.Join(".", path) } @@ -182,6 +107,7 @@ func pathGenerator(path string, reverse bool, str chan string) { func walkPath(path string, f func(string) error, reverse bool) (err error) { paths := make(chan string) + // TODO(labkode): carefully review this, a race could happen if pathGenerator gorouting is slow go pathGenerator(path, reverse, paths) for path := range paths { if path == "" { @@ -196,55 +122,3 @@ func walkPath(path string, f func(string) error, reverse bool) (err error) { return } - -// TODO: Use when fileids are available -/* -func (fs *cephfs) writeIndex(oid string, value string) (err error) { - return fs.adminConn.radosIO.WriteFull(oid, []byte(value)) -} - -func (fs *cephfs) removeIndex(oid string) error { - return fs.adminConn.radosIO.Delete(oid) -} - -func (fs *cephfs) resolveIndex(oid string) (fullPath string, err error) { - var i int - var currPath strings.Builder - root := string(filepath.Separator) - offset := uint64(0) - io := fs.adminConn.radosIO - bsize := 4096 - buffer := make([]byte, bsize) - for { - for { //read object - i, err = io.Read(oid, buffer, offset) - offset += uint64(bsize) - currPath.Write(buffer) - if err == nil && i >= bsize { - buffer = buffer[:0] - continue - } else { - offset = 0 - break - } - } - if err != nil { - return - } - - ss := strings.SplitN(currPath.String(), string(filepath.Separator), 2) - if len(ss) != 2 { - if currPath.String() == root { - return - } - - return "", fmt.Errorf("cephfs: entry id is not in the form of \"parentID/entryname\"") - } - parentOID := ss[0] - entryName := ss[1] - fullPath = filepath.Join(entryName, fullPath) - oid = parentOID - currPath.Reset() - } -} -*/ diff --git a/pkg/user/manager/ldap/ldap.go b/pkg/user/manager/ldap/ldap.go index 31dbf8d2af..0973f0a99e 100644 --- a/pkg/user/manager/ldap/ldap.go +++ b/pkg/user/manager/ldap/ldap.go @@ -374,6 +374,9 @@ func (m *manager) FindUsers(ctx context.Context, query string, skipFetchingGroup } func (m *manager) GetUserGroups(ctx context.Context, uid *userpb.UserId) ([]string, error) { + if m.c.GroupFilter == "" { + return []string{}, nil + } l, err := utils.GetLDAPConnection(&m.c.LDAPConn) if err != nil { return []string{}, err