diff --git a/internal/grpc/services/gateway/gateway.go b/internal/grpc/services/gateway/gateway.go index 981fee2fa3b..a4d40c391b5 100644 --- a/internal/grpc/services/gateway/gateway.go +++ b/internal/grpc/services/gateway/gateway.go @@ -27,6 +27,8 @@ import ( gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" "github.com/ReneKroon/ttlcache/v2" + "github.com/cs3org/reva/pkg/datatx" + datatxreg "github.com/cs3org/reva/pkg/datatx/manager/registry" "github.com/cs3org/reva/pkg/errtypes" "github.com/cs3org/reva/pkg/rgrpc" "github.com/cs3org/reva/pkg/sharedconf" @@ -42,27 +44,29 @@ func init() { } type config struct { - AuthRegistryEndpoint string `mapstructure:"authregistrysvc"` - ApplicationAuthEndpoint string `mapstructure:"applicationauthsvc"` - StorageRegistryEndpoint string `mapstructure:"storageregistrysvc"` - AppRegistryEndpoint string `mapstructure:"appregistrysvc"` - PreferencesEndpoint string `mapstructure:"preferencessvc"` - UserShareProviderEndpoint string `mapstructure:"usershareprovidersvc"` - PublicShareProviderEndpoint string `mapstructure:"publicshareprovidersvc"` - OCMShareProviderEndpoint string `mapstructure:"ocmshareprovidersvc"` - OCMInviteManagerEndpoint string `mapstructure:"ocminvitemanagersvc"` - OCMProviderAuthorizerEndpoint string `mapstructure:"ocmproviderauthorizersvc"` - OCMCoreEndpoint string `mapstructure:"ocmcoresvc"` - UserProviderEndpoint string `mapstructure:"userprovidersvc"` - GroupProviderEndpoint string `mapstructure:"groupprovidersvc"` - DataTxEndpoint string `mapstructure:"datatx"` - DataGatewayEndpoint string `mapstructure:"datagateway"` - CommitShareToStorageGrant bool `mapstructure:"commit_share_to_storage_grant"` - CommitShareToStorageRef bool `mapstructure:"commit_share_to_storage_ref"` - DisableHomeCreationOnLogin bool `mapstructure:"disable_home_creation_on_login"` - TransferSharedSecret string `mapstructure:"transfer_shared_secret"` - TransferExpires int64 `mapstructure:"transfer_expires"` - TokenManager string `mapstructure:"token_manager"` + AuthRegistryEndpoint string `mapstructure:"authregistrysvc"` + ApplicationAuthEndpoint string `mapstructure:"applicationauthsvc"` + StorageRegistryEndpoint string `mapstructure:"storageregistrysvc"` + AppRegistryEndpoint string `mapstructure:"appregistrysvc"` + PreferencesEndpoint string `mapstructure:"preferencessvc"` + UserShareProviderEndpoint string `mapstructure:"usershareprovidersvc"` + PublicShareProviderEndpoint string `mapstructure:"publicshareprovidersvc"` + OCMShareProviderEndpoint string `mapstructure:"ocmshareprovidersvc"` + OCMInviteManagerEndpoint string `mapstructure:"ocminvitemanagersvc"` + OCMProviderAuthorizerEndpoint string `mapstructure:"ocmproviderauthorizersvc"` + OCMCoreEndpoint string `mapstructure:"ocmcoresvc"` + UserProviderEndpoint string `mapstructure:"userprovidersvc"` + GroupProviderEndpoint string `mapstructure:"groupprovidersvc"` + DataTxEndpoint string `mapstructure:"datatx"` + DataGatewayEndpoint string `mapstructure:"datagateway"` + CommitShareToStorageGrant bool `mapstructure:"commit_share_to_storage_grant"` + CommitShareToStorageRef bool `mapstructure:"commit_share_to_storage_ref"` + DisableHomeCreationOnLogin bool `mapstructure:"disable_home_creation_on_login"` + TransferSharedSecret string `mapstructure:"transfer_shared_secret"` + TransferExpires int64 `mapstructure:"transfer_expires"` + TokenManager string `mapstructure:"token_manager"` + DatatxManager string `mapstructure:"datatx_manager"` + DatatxManagers map[string]map[string]interface{} `mapstructure:"datatx_managers"` // ShareFolder is the location where to create shares in the recipient's storage provider. ShareFolder string `mapstructure:"share_folder"` DataTransfersFolder string `mapstructure:"data_transfers_folder"` @@ -83,6 +87,10 @@ func (c *config) init() { c.DataTransfersFolder = "Data-Transfers" } + if c.DatatxManager == "" { + c.DatatxManager = "rclone" + } + if c.TokenManager == "" { c.TokenManager = "jwt" } @@ -120,6 +128,7 @@ type svc struct { dataGatewayURL url.URL tokenmgr token.Manager etagCache *ttlcache.Cache `mapstructure:"etag_cache"` + dtxm datatx.Manager } // New creates a new gateway svc that acts as a proxy for any grpc operation. @@ -144,6 +153,11 @@ func New(m map[string]interface{}, ss *grpc.Server) (rgrpc.Service, error) { return nil, err } + datatxManager, err := getDatatxManager(c.DatatxManager, c.DatatxManagers) + if err != nil { + return nil, err + } + etagCache := ttlcache.NewCache() _ = etagCache.SetTTL(time.Duration(c.EtagCacheTTL) * time.Second) etagCache.SkipTTLExtensionOnHit(true) @@ -153,6 +167,7 @@ func New(m map[string]interface{}, ss *grpc.Server) (rgrpc.Service, error) { dataGatewayURL: *u, tokenmgr: tokenManager, etagCache: etagCache, + dtxm: datatxManager, } return s, nil @@ -187,3 +202,11 @@ func getTokenManager(manager string, m map[string]map[string]interface{}) (token return nil, errtypes.NotFound(fmt.Sprintf("driver %s not found for token manager", manager)) } + +func getDatatxManager(manager string, m map[string]map[string]interface{}) (datatx.Manager, error) { + if f, ok := datatxreg.NewFuncs[manager]; ok { + return f(m[manager]) + } + + return nil, errtypes.NotFound(fmt.Sprintf("driver %s not found for token manager", manager)) +} diff --git a/internal/grpc/services/gateway/ocmshareprovider.go b/internal/grpc/services/gateway/ocmshareprovider.go index 872009d9c1f..eda56e163b0 100644 --- a/internal/grpc/services/gateway/ocmshareprovider.go +++ b/internal/grpc/services/gateway/ocmshareprovider.go @@ -22,6 +22,7 @@ import ( "context" "fmt" "path" + "strings" rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" ocm "github.com/cs3org/go-cs3apis/cs3/sharing/ocm/v1beta1" @@ -30,6 +31,7 @@ import ( "github.com/cs3org/reva/pkg/errtypes" "github.com/cs3org/reva/pkg/rgrpc/status" "github.com/cs3org/reva/pkg/rgrpc/todo/pool" + "github.com/cs3org/reva/pkg/token" "github.com/pkg/errors" ) @@ -219,6 +221,70 @@ func (s *svc) UpdateReceivedOCMShare(ctx context.Context, req *ocm.UpdateReceive }, nil } + getShareReq := &ocm.GetReceivedOCMShareRequest{Ref: req.Ref} + getShareRes, err := s.GetReceivedOCMShare(ctx, getShareReq) + if err != nil { + log.Err(err).Msg("gateway: error calling GetReceivedShare") + return &ocm.UpdateReceivedOCMShareResponse{ + Status: &rpc.Status{ + Code: rpc.Code_CODE_INTERNAL, + }, + }, nil + } + + if getShareRes.Status.Code != rpc.Code_CODE_OK { + log.Error().Msg("gateway: error calling GetReceivedShare") + return &ocm.UpdateReceivedOCMShareResponse{ + Status: &rpc.Status{ + Code: rpc.Code_CODE_INTERNAL, + }, + }, nil + } + + share := getShareRes.Share + if share == nil { + panic("gateway: error updating a received share: the share is nil") + } + + if share.GetShare().ShareType == ocm.Share_SHARE_TYPE_TRANSFER { + srcRemote := share.GetShare().GetOwner().GetIdp() + // TODO do we actually know for sure the home path of the src reva instance? + srcPath := strings.TrimPrefix(share.GetShare().GetName(), "/home") + var srcToken string + srcTokenOpaque, ok := share.GetShare().Grantee.Opaque.Map["token"] + if !ok { + return &ocm.UpdateReceivedOCMShareResponse{ + Status: status.NewNotFound(ctx, "token not found"), + }, nil + } + switch srcTokenOpaque.Decoder { + case "plain": + srcToken = string(srcTokenOpaque.Value) + default: + err := errtypes.NotSupported("opaque entry decoder not recognized: " + srcTokenOpaque.Decoder) + return &ocm.UpdateReceivedOCMShareResponse{ + Status: status.NewInternal(ctx, err, "error updating received share"), + }, nil + } + destRemote := share.GetShare().GetGrantee().GetUserId().GetIdp() + destPath := path.Join(s.c.DataTransfersFolder, path.Base(share.GetShare().Name)) + destToken, ok := token.ContextGetToken(ctx) + if !ok || destToken == "" { + return &ocm.UpdateReceivedOCMShareResponse{ + Status: status.NewInternal(ctx, err, "error updating received share"), + }, nil + } + + datatxInfoStatus, err := s.dtxm.CreateTransfer(share.GetShare().GetId().OpaqueId, srcRemote, srcPath, srcToken, destRemote, destPath, destToken) + if err != nil { + return &ocm.UpdateReceivedOCMShareResponse{ + Status: status.NewInternal(ctx, err, "error updating received share"), + }, nil + } + log.Info().Msg("datatx transfer created: " + datatxInfoStatus.String()) + return res, nil + } + // if we don't need to create/delete references then we return early. if !s.c.CommitShareToStorageGrant && !s.c.CommitShareToStorageRef { return res, nil @@ -234,31 +300,7 @@ func (s *svc) UpdateReceivedOCMShare(ctx context.Context, req *ocm.UpdateReceive // TODO(labkode): if update field is displayName we need to do a rename on the storage to align // share display name and storage filename. if req.Field.GetState() != ocm.ShareState_SHARE_STATE_INVALID { - if req.Field.GetState() == ocm.ShareState_SHARE_STATE_ACCEPTED { - getShareReq := &ocm.GetReceivedOCMShareRequest{Ref: req.Ref} - getShareRes, err := s.GetReceivedOCMShare(ctx, getShareReq) - if err != nil { - log.Err(err).Msg("gateway: error calling GetReceivedShare") - return &ocm.UpdateReceivedOCMShareResponse{ - Status: &rpc.Status{ - Code: rpc.Code_CODE_INTERNAL, - }, - }, nil - } - - if getShareRes.Status.Code != rpc.Code_CODE_OK { - log.Error().Msg("gateway: error calling GetReceivedShare") - return &ocm.UpdateReceivedOCMShareResponse{ - Status: &rpc.Status{ - Code: rpc.Code_CODE_INTERNAL, - }, - }, nil - } - - share := getShareRes.Share - if share == nil { - panic("gateway: error updating a received share: the share is nil") - } + if req.Field.GetState() == ocm.ShareState_SHARE_STATE_ACCEPTED || req.Field.GetState() == ocm.ShareState_SHARE_STATE_PENDING { createRefStatus, err := s.createOCMReference(ctx, share.Share) return &ocm.UpdateReceivedOCMShareResponse{ @@ -348,6 +390,7 @@ func (s *svc) createOCMReference(ctx context.Context, share *ocm.Share) (*rpc.St Path: refPath, TargetUri: targetURI, } + fmt.Printf("createReferenceReq: %v \n", createRefReq) c, err := s.findByPath(ctx, refPath) if err != nil { @@ -370,5 +413,7 @@ func (s *svc) createOCMReference(ctx context.Context, share *ocm.Share) (*rpc.St return status.NewInternal(ctx, err, "error updating received share"), nil } + fmt.Printf("reference: %v \n", createRefRes) + return status.NewOK(ctx), nil } diff --git a/internal/grpc/services/ocmshareprovider/ocmshareprovider.go b/internal/grpc/services/ocmshareprovider/ocmshareprovider.go index 4de68610b72..1b4e8676db9 100644 --- a/internal/grpc/services/ocmshareprovider/ocmshareprovider.go +++ b/internal/grpc/services/ocmshareprovider/ocmshareprovider.go @@ -20,19 +20,13 @@ package ocmshareprovider import ( "context" - "path" - "strings" ocm "github.com/cs3org/go-cs3apis/cs3/sharing/ocm/v1beta1" - "github.com/cs3org/reva/pkg/appctx" - "github.com/cs3org/reva/pkg/datatx" - datatxreg "github.com/cs3org/reva/pkg/datatx/manager/registry" "github.com/cs3org/reva/pkg/errtypes" "github.com/cs3org/reva/pkg/ocm/share" "github.com/cs3org/reva/pkg/ocm/share/manager/registry" "github.com/cs3org/reva/pkg/rgrpc" "github.com/cs3org/reva/pkg/rgrpc/status" - "github.com/cs3org/reva/pkg/token" "github.com/mitchellh/mapstructure" "github.com/pkg/errors" "google.golang.org/grpc" @@ -43,16 +37,13 @@ func init() { } type config struct { - Driver string `mapstructure:"driver"` - Drivers map[string]map[string]interface{} `mapstructure:"drivers"` - DatatxDriver string `mapstructure:"datatxdriver"` - DatatxDrivers map[string]map[string]interface{} `mapstructure:"datatxdrivers"` + Driver string `mapstructure:"driver"` + Drivers map[string]map[string]interface{} `mapstructure:"drivers"` } type service struct { conf *config sm share.Manager - dtxm datatx.Manager } func (c *config) init() { @@ -72,13 +63,6 @@ func getShareManager(c *config) (share.Manager, error) { return nil, errtypes.NotFound("driver not found: " + c.Driver) } -func getDatatxManager(c *config) (datatx.Manager, error) { - if f, ok := datatxreg.NewFuncs[c.DatatxDriver]; ok { - return f(c.DatatxDrivers[c.DatatxDriver]) - } - return nil, errtypes.NotFound("datatx driver not found: " + c.DatatxDriver) -} - func parseConfig(m map[string]interface{}) (*config, error) { c := &config{} if err := mapstructure.Decode(m, c); err != nil { @@ -102,15 +86,9 @@ func New(m map[string]interface{}, ss *grpc.Server) (rgrpc.Service, error) { return nil, err } - dtxm, err := getDatatxManager(c) - if err != nil { - return nil, err - } - service := &service{ conf: c, sm: sm, - dtxm: dtxm, } return service, nil @@ -269,8 +247,6 @@ func (s *service) ListReceivedOCMShares(ctx context.Context, req *ocm.ListReceiv } func (s *service) UpdateReceivedOCMShare(ctx context.Context, req *ocm.UpdateReceivedOCMShareRequest) (*ocm.UpdateReceivedOCMShareResponse, error) { - log := appctx.GetLogger(ctx) - _, err := s.sm.UpdateReceivedShare(ctx, req.Ref, req.Field) // TODO(labkode): check what to update if err != nil { return &ocm.UpdateReceivedOCMShareResponse{ @@ -278,55 +254,6 @@ func (s *service) UpdateReceivedOCMShare(ctx context.Context, req *ocm.UpdateRec }, nil } - // initiate transfer in case this is a transfer type share - receivedShare, err := s.sm.GetReceivedShare(ctx, req.Ref) - if err != nil { - return &ocm.UpdateReceivedOCMShareResponse{ - Status: status.NewInternal(ctx, err, "error updating received share"), - }, nil - } - if receivedShare.GetShare().ShareType == ocm.Share_SHARE_TYPE_TRANSFER { - srcRemote := receivedShare.GetShare().GetOwner().GetIdp() - // remove the home path for webdav transfer calls - // TODO do we actually know for sure the home path of the src reva instance ?? - srcPath := strings.TrimPrefix(receivedShare.GetShare().GetName(), "/home") - var srcToken string - srcTokenOpaque, ok := receivedShare.GetShare().Grantee.Opaque.Map["token"] - if !ok { - return &ocm.UpdateReceivedOCMShareResponse{ - Status: status.NewNotFound(ctx, "token not found"), - }, nil - } - switch srcTokenOpaque.Decoder { - case "plain": - srcToken = string(srcTokenOpaque.Value) - default: - err := errtypes.NotSupported("opaque entry decoder not recognized: " + srcTokenOpaque.Decoder) - return &ocm.UpdateReceivedOCMShareResponse{ - Status: status.NewInternal(ctx, err, "error updating received share"), - }, nil - } - - destRemote := receivedShare.GetShare().GetGrantee().GetUserId().GetIdp() - // TODO how to get the data transfers folder? - destPath := path.Join("/Data-Transfers", path.Base(receivedShare.GetShare().Name)) - destToken, ok := token.ContextGetToken(ctx) - if !ok || destToken == "" { - return &ocm.UpdateReceivedOCMShareResponse{ - Status: status.NewInternal(ctx, err, "error updating received share"), - }, nil - } - - datatxInfoStatus, err := s.dtxm.CreateTransfer(receivedShare.GetShare().GetId().OpaqueId, srcRemote, srcPath, srcToken, destRemote, destPath, destToken) - if err != nil { - return &ocm.UpdateReceivedOCMShareResponse{ - Status: status.NewInternal(ctx, err, "error updating received share"), - }, nil - } - log.Info().Msg("datatx transfer created: " + datatxInfoStatus.String()) - - } - res := &ocm.UpdateReceivedOCMShareResponse{ Status: status.NewOK(ctx), }