diff --git a/changelog/unreleased/datatx-storage-drivers.md b/changelog/unreleased/datatx-storage-drivers.md new file mode 100644 index 0000000000..e81b30e001 --- /dev/null +++ b/changelog/unreleased/datatx-storage-drivers.md @@ -0,0 +1,4 @@ +Enhancement: Storage drivers setup for datatx + +https://github.com/cs3org/reva/pull/3915 +https://github.com/cs3org/reva/issues/3914 \ No newline at end of file diff --git a/cmd/reva/main.go b/cmd/reva/main.go index 03f96278aa..f04984a0a7 100644 --- a/cmd/reva/main.go +++ b/cmd/reva/main.go @@ -81,7 +81,6 @@ var ( shareUpdateCommand(), shareListReceivedCommand(), shareUpdateReceivedCommand(), - transferCreateCommand(), transferGetStatusCommand(), transferCancelCommand(), transferListCommand(), diff --git a/cmd/reva/transfer-create.go b/cmd/reva/transfer-create.go deleted file mode 100644 index 969e3f1bc0..0000000000 --- a/cmd/reva/transfer-create.go +++ /dev/null @@ -1,146 +0,0 @@ -// Copyright 2018-2023 CERN -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -// In applying this license, CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -package main - -import ( - "io" - "os" - "strings" - "time" - - userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" - invitepb "github.com/cs3org/go-cs3apis/cs3/ocm/invite/v1beta1" - ocmprovider "github.com/cs3org/go-cs3apis/cs3/ocm/provider/v1beta1" - rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" - ocm "github.com/cs3org/go-cs3apis/cs3/sharing/ocm/v1beta1" - provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" - "github.com/cs3org/reva/pkg/ocm/share" - "github.com/cs3org/reva/pkg/utils" - "github.com/jedib0t/go-pretty/table" - "github.com/pkg/errors" -) - -func transferCreateCommand() *command { - cmd := newCommand("transfer-create") - cmd.Description = func() string { return "create transfer between 2 sites" } - cmd.Usage = func() string { return "Usage: transfer-create [-flags] " } - grantee := cmd.String("grantee", "", "the grantee, receiver of the transfer") - granteeType := cmd.String("granteeType", "user", "the grantee type, one of: user, group (defaults to user)") - idp := cmd.String("idp", "", "the idp of the grantee") - userType := cmd.String("user-type", "primary", "the type of user account, defaults to primary") - - cmd.Action = func(w ...io.Writer) error { - if cmd.NArg() < 1 { - return errors.New("Invalid arguments: " + cmd.Usage()) - } - - if *grantee == "" { - return errors.New("Grantee cannot be empty: use -grantee flag\n" + cmd.Usage()) - } - if *idp == "" { - return errors.New("Idp cannot be empty: use -idp flag\n" + cmd.Usage()) - } - - // the resource to transfer; the path - fn := cmd.Args()[0] - - ctx := getAuthContext() - client, err := getClient() - if err != nil { - return err - } - - u := &userpb.UserId{OpaqueId: *grantee, Idp: *idp, Type: utils.UserTypeMap(*userType)} - - // check if invitation has been accepted - acceptedUserRes, err := client.GetAcceptedUser(ctx, &invitepb.GetAcceptedUserRequest{ - RemoteUserId: u, - }) - if err != nil { - return err - } - if acceptedUserRes.Status.Code != rpc.Code_CODE_OK { - return formatError(acceptedUserRes.Status) - } - - // verify resource stats - statReq := &provider.StatRequest{ - Ref: &provider.Reference{Path: fn}, - } - statRes, err := client.Stat(ctx, statReq) - if err != nil { - return err - } - if statRes.Status.Code != rpc.Code_CODE_OK { - return formatError(statRes.Status) - } - - providerInfoResp, err := client.GetInfoByDomain(ctx, &ocmprovider.GetInfoByDomainRequest{ - Domain: *idp, - }) - if err != nil { - return err - } - - gt := provider.GranteeType_GRANTEE_TYPE_USER - if strings.ToLower(*granteeType) == "group" { - gt = provider.GranteeType_GRANTEE_TYPE_GROUP - } - - createShareReq := &ocm.CreateOCMShareRequest{ - Grantee: &provider.Grantee{ - Type: gt, - Id: &provider.Grantee_UserId{ - UserId: u, - }, - }, - ResourceId: statRes.Info.Id, - AccessMethods: []*ocm.AccessMethod{ - share.NewTransferAccessMethod(), - }, - RecipientMeshProvider: providerInfoResp.ProviderInfo, - } - - createShareResponse, err := client.CreateOCMShare(ctx, createShareReq) - if err != nil { - return err - } - if createShareResponse.Status.Code != rpc.Code_CODE_OK { - if createShareResponse.Status.Code == rpc.Code_CODE_NOT_FOUND { - return formatError(statRes.Status) - } - return err - } - - t := table.NewWriter() - t.SetOutputMirror(os.Stdout) - t.AppendHeader(table.Row{"#", "Owner.Idp", "Owner.OpaqueId", "ResourceId", "Type", "Grantee.Idp", "Grantee.OpaqueId", "ShareType", "Created", "Updated"}) - - s := createShareResponse.Share - t.AppendRows([]table.Row{ - {s.Id.OpaqueId, s.Owner.Idp, s.Owner.OpaqueId, s.ResourceId.String(), - s.Grantee.Type.String(), s.Grantee.GetUserId().Idp, s.Grantee.GetUserId().OpaqueId, s.ShareType.String(), - time.Unix(int64(s.Ctime.Seconds), 0), time.Unix(int64(s.Mtime.Seconds), 0)}, - }) - t.Render() - return nil - } - - return cmd -} diff --git a/docs/content/en/docs/tutorials/datatx-tutorial.md b/docs/content/en/docs/tutorials/datatx-tutorial.md index 2da5a5ddf6..4e24cf3a0d 100644 --- a/docs/content/en/docs/tutorials/datatx-tutorial.md +++ b/docs/content/en/docs/tutorials/datatx-tutorial.md @@ -166,4 +166,6 @@ remove_transfer_on_cancel = true [grpc.services.datatx.txdrivers.rclone] remove_transfer_job_on_cancel = true ``` -Currently this setting is recommended. \ No newline at end of file +Currently this setting is recommended. + +*Note that with these settings `transfer-cancel` will remove transfers & jobs even when a transfer cannot actually be cancelled because it was already in an end-state, eg. `finished` or `failed`. So `transfer-cancel` will act like a 'delete' function. \ No newline at end of file diff --git a/examples/datatx/datatx.toml b/examples/datatx/datatx.toml index 2a4633758e..b5296f0cb1 100644 --- a/examples/datatx/datatx.toml +++ b/examples/datatx/datatx.toml @@ -9,10 +9,8 @@ data_transfers_folder = "" [grpc.services.datatx] # rclone is currently the only data transfer driver implementation txdriver = "rclone" -# the shares,transfers db file (default: /var/tmp/reva/datatx-shares.json) -tx_shares_file = "" -# base folder of the data transfers (eg. /home/DataTransfers) -data_transfers_folder = "" +# the storage driver +storagedriver = "json" # if set to 'true' the transfer will always be removed from the db upon cancel request # recommended value is true remove_transfer_on_cancel = true @@ -30,16 +28,24 @@ auth_pass = "{rclone user secret}" # "x-access-token" will result in rclone using request header: X-Access-Token: "...token..." # If not set "bearer" is assumed auth_header = "x-access-token" -# the transfers(jobs) db file (default: /var/tmp/reva/datatx-transfers.json) -file = "" # check status job interval in milliseconds job_status_check_interval = 2000 # the job timeout in milliseconds (must be long enough for big transfers!) job_timeout = 120000 +# the storage driver +storagedriver = "json" # if set to 'true' the transfer job will always be removed from the db upon transfer cancel request # recommended value is true remove_transfer_job_on_cancel = true +[grpc.services.datatx.storagedrivers.json] +# the datatx transfers db file (defaults to: /var/tmp/reva/datatx-transfers.json) +file = "" + +[grpc.services.datatx.txdrivers.rclone.storagedrivers.json] +# the transfers jobs db file (defaults to: /var/tmp/reva/transfer-jobs.json) +file = "" + [http.services.ocdav] # reva supports http third party copy enable_http_tpc = true diff --git a/internal/grpc/services/datatx/datatx.go b/internal/grpc/services/datatx/datatx.go index e2be50749b..b9f5011a3d 100644 --- a/internal/grpc/services/datatx/datatx.go +++ b/internal/grpc/services/datatx/datatx.go @@ -20,15 +20,13 @@ package datatx import ( "context" - "encoding/json" - "io" - "os" - "sync" ocm "github.com/cs3org/go-cs3apis/cs3/sharing/ocm/v1beta1" datatx "github.com/cs3org/go-cs3apis/cs3/tx/v1beta1" + ctxpkg "github.com/cs3org/reva/pkg/ctx" txdriver "github.com/cs3org/reva/pkg/datatx" txregistry "github.com/cs3org/reva/pkg/datatx/manager/registry" + repoRegistry "github.com/cs3org/reva/pkg/datatx/repository/registry" "github.com/cs3org/reva/pkg/errtypes" "github.com/cs3org/reva/pkg/rgrpc" "github.com/cs3org/reva/pkg/rgrpc/status" @@ -43,44 +41,23 @@ func init() { type config struct { // transfer driver - TxDriver string `mapstructure:"txdriver"` - TxDrivers map[string]map[string]interface{} `mapstructure:"txdrivers"` - // storage driver to persist share/transfer relation - StorageDriver string `mapstructure:"storage_driver"` - StorageDrivers map[string]map[string]interface{} `mapstructure:"storage_drivers"` - TxSharesFile string `mapstructure:"tx_shares_file"` - RemoveTransferOnCancel bool `mapstructure:"remove_transfer_on_cancel"` + TxDriver string `mapstructure:"txdriver"` + TxDrivers map[string]map[string]interface{} `mapstructure:"txdrivers"` + StorageDriver string `mapstructure:"storagedriver"` + StorageDrivers map[string]map[string]interface{} `mapstructure:"storagedrivers"` + RemoveOnCancel bool `mapstructure:"remove_transfer_on_cancel"` } type service struct { conf *config txManager txdriver.Manager - txShareDriver *txShareDriver -} - -type txShareDriver struct { - sync.Mutex // concurrent access to the file - model *txShareModel -} -type txShareModel struct { - File string - TxShares map[string]*txShare `json:"shares"` -} - -type txShare struct { - TxID string - SrcTargetURI string - DestTargetURI string - ShareID string + storageDriver txdriver.Repository } func (c *config) init() { if c.TxDriver == "" { c.TxDriver = "rclone" } - if c.TxSharesFile == "" { - c.TxSharesFile = "/var/tmp/reva/datatx-shares.json" - } } func (s *service) Register(ss *grpc.Server) { @@ -94,6 +71,13 @@ func getDatatxManager(c *config) (txdriver.Manager, error) { return nil, errtypes.NotFound("datatx service: driver not found: " + c.TxDriver) } +func getStorageManager(c *config) (txdriver.Repository, error) { + if f, ok := repoRegistry.NewFuncs[c.StorageDriver]; ok { + return f(c.StorageDrivers[c.StorageDriver]) + } + return nil, errtypes.NotFound("datatx service: driver not found: " + c.StorageDriver) +} + func parseConfig(m map[string]interface{}) (*config, error) { c := &config{} if err := mapstructure.Decode(m, c); err != nil { @@ -116,19 +100,15 @@ func New(m map[string]interface{}, ss *grpc.Server) (rgrpc.Service, error) { return nil, err } - model, err := loadOrCreate(c.TxSharesFile) + storageDriver, err := getStorageManager(c) if err != nil { - err = errors.Wrap(err, "datatx service: error loading the file containing the transfer shares") return nil, err } - txShareDriver := &txShareDriver{ - model: model, - } service := &service{ conf: c, txManager: txManager, - txShareDriver: txShareDriver, + storageDriver: storageDriver, } return service, nil @@ -147,18 +127,16 @@ func (s *service) CreateTransfer(ctx context.Context, req *datatx.CreateTransfer // we always save the transfer regardless of start transfer outcome // only then, if starting fails, can we try to restart it - txShare := &txShare{ + userID := ctxpkg.ContextMustGetUser(ctx).GetId() + transfer := &txdriver.Transfer{ TxID: txInfo.GetId().OpaqueId, SrcTargetURI: req.SrcTargetUri, DestTargetURI: req.DestTargetUri, ShareID: req.GetShareId().OpaqueId, + UserID: userID, } - s.txShareDriver.Lock() - defer s.txShareDriver.Unlock() - - s.txShareDriver.model.TxShares[txInfo.GetId().OpaqueId] = txShare - if err := s.txShareDriver.model.saveTxShare(); err != nil { - err = errors.Wrap(err, "datatx service: error saving transfer share: "+datatx.Status_STATUS_INVALID.String()) + if err := s.storageDriver.StoreTransfer(transfer); err != nil { + err = errors.Wrap(err, "datatx service: error NEW saving transfer share: "+datatx.Status_STATUS_INVALID.String()) return &datatx.CreateTransferResponse{ Status: status.NewInvalid(ctx, "error creating transfer"), }, err @@ -180,8 +158,8 @@ func (s *service) CreateTransfer(ctx context.Context, req *datatx.CreateTransfer } func (s *service) GetTransferStatus(ctx context.Context, req *datatx.GetTransferStatusRequest) (*datatx.GetTransferStatusResponse, error) { - txShare, ok := s.txShareDriver.model.TxShares[req.GetTxId().GetOpaqueId()] - if !ok { + transfer, err := s.storageDriver.GetTransfer(req.TxId.OpaqueId) + if err != nil { return nil, errtypes.InternalError("datatx service: transfer not found") } @@ -194,7 +172,7 @@ func (s *service) GetTransferStatus(ctx context.Context, req *datatx.GetTransfer }, err } - txInfo.ShareId = &ocm.ShareId{OpaqueId: txShare.ShareID} + txInfo.ShareId = &ocm.ShareId{OpaqueId: transfer.ShareID} return &datatx.GetTransferStatusResponse{ Status: status.NewOK(ctx), @@ -203,15 +181,14 @@ func (s *service) GetTransferStatus(ctx context.Context, req *datatx.GetTransfer } func (s *service) CancelTransfer(ctx context.Context, req *datatx.CancelTransferRequest) (*datatx.CancelTransferResponse, error) { - txShare, ok := s.txShareDriver.model.TxShares[req.GetTxId().OpaqueId] - if !ok { + transfer, err := s.storageDriver.GetTransfer(req.TxId.OpaqueId) + if err != nil { return nil, errtypes.InternalError("datatx service: transfer not found") } transferRemovedMessage := "" - if s.conf.RemoveTransferOnCancel { - delete(s.txShareDriver.model.TxShares, req.TxId.GetOpaqueId()) - if err := s.txShareDriver.model.saveTxShare(); err != nil { + if s.conf.RemoveOnCancel { + if err := s.storageDriver.DeleteTransfer(transfer); err != nil { err = errors.Wrap(err, "datatx service: error deleting transfer: "+datatx.Status_STATUS_INVALID.String()) return &datatx.CancelTransferResponse{ Status: status.NewInvalid(ctx, "error cancelling transfer"), @@ -222,7 +199,7 @@ func (s *service) CancelTransfer(ctx context.Context, req *datatx.CancelTransfer txInfo, err := s.txManager.CancelTransfer(ctx, req.GetTxId().OpaqueId) if err != nil { - txInfo.ShareId = &ocm.ShareId{OpaqueId: txShare.ShareID} + txInfo.ShareId = &ocm.ShareId{OpaqueId: transfer.ShareID} err = errors.Wrapf(err, "(%v) datatx service: error cancelling transfer", transferRemovedMessage) return &datatx.CancelTransferResponse{ Status: status.NewInternal(ctx, err, "error cancelling transfer"), @@ -230,7 +207,7 @@ func (s *service) CancelTransfer(ctx context.Context, req *datatx.CancelTransfer }, err } - txInfo.ShareId = &ocm.ShareId{OpaqueId: txShare.ShareID} + txInfo.ShareId = &ocm.ShareId{OpaqueId: transfer.ShareID} return &datatx.CancelTransferResponse{ Status: status.NewOK(ctx), @@ -239,26 +216,23 @@ func (s *service) CancelTransfer(ctx context.Context, req *datatx.CancelTransfer } func (s *service) ListTransfers(ctx context.Context, req *datatx.ListTransfersRequest) (*datatx.ListTransfersResponse, error) { - filters := req.Filters - var txInfos []*datatx.TxInfo - for _, txShare := range s.txShareDriver.model.TxShares { - if len(filters) == 0 { - txInfos = append(txInfos, &datatx.TxInfo{ - Id: &datatx.TxId{OpaqueId: txShare.TxID}, - ShareId: &ocm.ShareId{OpaqueId: txShare.ShareID}, - }) - } else { - for _, f := range filters { - if f.Type == datatx.ListTransfersRequest_Filter_TYPE_SHARE_ID { - if f.GetShareId().GetOpaqueId() == txShare.ShareID { - txInfos = append(txInfos, &datatx.TxInfo{ - Id: &datatx.TxId{OpaqueId: txShare.TxID}, - ShareId: &ocm.ShareId{OpaqueId: txShare.ShareID}, - }) - } - } - } - } + userID := ctxpkg.ContextMustGetUser(ctx).GetId() + transfers, err := s.storageDriver.ListTransfers(req.Filters, userID) + if err != nil { + err = errors.Wrap(err, "datatx service: error listing transfers") + var txInfos []*datatx.TxInfo + return &datatx.ListTransfersResponse{ + Status: status.NewInternal(ctx, err, "error listing transfers"), + Transfers: txInfos, + }, err + } + + txInfos := []*datatx.TxInfo{} + for _, transfer := range transfers { + txInfos = append(txInfos, &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: transfer.TxID}, + ShareId: &ocm.ShareId{OpaqueId: transfer.ShareID}, + }) } return &datatx.ListTransfersResponse{ @@ -268,8 +242,8 @@ func (s *service) ListTransfers(ctx context.Context, req *datatx.ListTransfersRe } func (s *service) RetryTransfer(ctx context.Context, req *datatx.RetryTransferRequest) (*datatx.RetryTransferResponse, error) { - txShare, ok := s.txShareDriver.model.TxShares[req.GetTxId().GetOpaqueId()] - if !ok { + transfer, err := s.storageDriver.GetTransfer(req.TxId.OpaqueId) + if err != nil { return nil, errtypes.InternalError("datatx service: transfer not found") } @@ -282,61 +256,10 @@ func (s *service) RetryTransfer(ctx context.Context, req *datatx.RetryTransferRe }, err } - txInfo.ShareId = &ocm.ShareId{OpaqueId: txShare.ShareID} + txInfo.ShareId = &ocm.ShareId{OpaqueId: transfer.ShareID} return &datatx.RetryTransferResponse{ Status: status.NewOK(ctx), TxInfo: txInfo, }, nil } - -func loadOrCreate(file string) (*txShareModel, error) { - _, err := os.Stat(file) - if os.IsNotExist(err) { - if err := os.WriteFile(file, []byte("{}"), 0700); err != nil { - err = errors.Wrap(err, "datatx service: error creating the transfer shares storage file: "+file) - return nil, err - } - } - - fd, err := os.OpenFile(file, os.O_CREATE, 0644) - if err != nil { - err = errors.Wrap(err, "datatx service: error opening the transfer shares storage file: "+file) - return nil, err - } - defer fd.Close() - - data, err := io.ReadAll(fd) - if err != nil { - err = errors.Wrap(err, "datatx service: error reading the data") - return nil, err - } - - model := &txShareModel{} - if err := json.Unmarshal(data, model); err != nil { - err = errors.Wrap(err, "datatx service: error decoding transfer shares data to json") - return nil, err - } - - if model.TxShares == nil { - model.TxShares = make(map[string]*txShare) - } - - model.File = file - return model, nil -} - -func (m *txShareModel) saveTxShare() error { - data, err := json.Marshal(m) - if err != nil { - err = errors.Wrap(err, "datatx service: error encoding transfer share data to json") - return err - } - - if err := os.WriteFile(m.File, data, 0644); err != nil { - err = errors.Wrap(err, "datatx service: error writing transfer share data to file: "+m.File) - return err - } - - return nil -} diff --git a/pkg/datatx/datatx.go b/pkg/datatx/datatx.go index 8c430e2bbc..616a5a64ca 100644 --- a/pkg/datatx/datatx.go +++ b/pkg/datatx/datatx.go @@ -21,6 +21,7 @@ package datatx import ( "context" + userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" datatx "github.com/cs3org/go-cs3apis/cs3/tx/v1beta1" ) @@ -37,3 +38,24 @@ type Manager interface { // Note that tokens must still be valid. RetryTransfer(ctx context.Context, transferID string) (*datatx.TxInfo, error) } + +// Transfer represents datatx transfer. +type Transfer struct { + TxID string + SrcTargetURI string + DestTargetURI string + ShareID string + UserID *userv1beta1.UserId +} + +// Repository the interface that any storage driver should implement. +type Repository interface { + // StoreTransfer stores the transfer by its TxID + StoreTransfer(transfer *Transfer) error + // StoreTransfer deletes the transfer by its TxID + DeleteTransfer(transfer *Transfer) error + // GetTransfer returns the transfer with the specified transfer id + GetTransfer(txID string) (*Transfer, error) + // ListTransfers returns a filtered list of transfers + ListTransfers(Filters []*datatx.ListTransfersRequest_Filter, UserID *userv1beta1.UserId) ([]*Transfer, error) +} diff --git a/pkg/datatx/manager/loader/loader.go b/pkg/datatx/manager/loader/loader.go index 1452dc5dae..2c9d77d745 100644 --- a/pkg/datatx/manager/loader/loader.go +++ b/pkg/datatx/manager/loader/loader.go @@ -21,5 +21,7 @@ package loader import ( // Load datatx drivers. _ "github.com/cs3org/reva/pkg/datatx/manager/rclone" + _ "github.com/cs3org/reva/pkg/datatx/manager/rclone/repository/json" + _ "github.com/cs3org/reva/pkg/datatx/repository/json" // Add your own here. ) diff --git a/pkg/datatx/manager/rclone/rclone.go b/pkg/datatx/manager/rclone/rclone.go index 0e9d2dfe55..932677a106 100644 --- a/pkg/datatx/manager/rclone/rclone.go +++ b/pkg/datatx/manager/rclone/rclone.go @@ -23,19 +23,18 @@ import ( "context" "encoding/json" "fmt" - "io" "net/http" "net/url" - "os" "path" "strconv" - "sync" "time" datatx "github.com/cs3org/go-cs3apis/cs3/tx/v1beta1" typespb "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" "github.com/cs3org/reva/pkg/appctx" txdriver "github.com/cs3org/reva/pkg/datatx" + "github.com/cs3org/reva/pkg/datatx/manager/rclone/repository" + repoRegistry "github.com/cs3org/reva/pkg/datatx/manager/rclone/repository/registry" registry "github.com/cs3org/reva/pkg/datatx/manager/registry" "github.com/cs3org/reva/pkg/errtypes" "github.com/cs3org/reva/pkg/rhttp" @@ -50,9 +49,6 @@ func init() { func (c *config) init(m map[string]interface{}) { // set sane defaults - if c.File == "" { - c.File = "/var/tmp/reva/datatx-transfers.json" - } if c.JobStatusCheckInterval == 0 { c.JobStatusCheckInterval = 2000 } @@ -62,21 +58,22 @@ func (c *config) init(m map[string]interface{}) { } type config struct { - Endpoint string `mapstructure:"endpoint"` - AuthUser string `mapstructure:"auth_user"` // rclone basicauth user - AuthPass string `mapstructure:"auth_pass"` // rclone basicauth pass - AuthHeader string `mapstructure:"auth_header"` - File string `mapstructure:"file"` - JobStatusCheckInterval int `mapstructure:"job_status_check_interval"` - JobTimeout int `mapstructure:"job_timeout"` - Insecure bool `mapstructure:"insecure"` - RemoveTransferJobOnCancel bool `mapstructure:"remove_transfer_job_on_cancel"` + Endpoint string `mapstructure:"endpoint"` + AuthUser string `mapstructure:"auth_user"` // rclone basicauth user + AuthPass string `mapstructure:"auth_pass"` // rclone basicauth pass + AuthHeader string `mapstructure:"auth_header"` + JobStatusCheckInterval int `mapstructure:"job_status_check_interval"` + JobTimeout int `mapstructure:"job_timeout"` + Insecure bool `mapstructure:"insecure"` + RemoveTransferJobOnCancel bool `mapstructure:"remove_transfer_job_on_cancel"` + StorageDriver string `mapstructure:"storagedriver"` + StorageDrivers map[string]map[string]interface{} `mapstructure:"storagedrivers"` } type rclone struct { config *config client *http.Client - pDriver *pDriver + storage repository.Repository } type rcloneHTTPErrorRes struct { @@ -86,30 +83,6 @@ type rcloneHTTPErrorRes struct { Status int `json:"status"` } -type transferModel struct { - File string - Transfers map[string]*transfer `json:"transfers"` -} - -// persistency driver. -type pDriver struct { - sync.Mutex // concurrent access to the file - model *transferModel -} - -type transfer struct { - TransferID string - JobID int64 - TransferStatus datatx.Status - SrcToken string - SrcRemote string - SrcPath string - DestToken string - DestRemote string - DestPath string - Ctime string -} - // txEndStatuses final statuses that cannot be changed anymore. var txEndStatuses = map[string]int32{ "STATUS_INVALID": 0, @@ -138,21 +111,15 @@ func New(m map[string]interface{}) (txdriver.Manager, error) { client := rhttp.GetHTTPClient(rhttp.Insecure(c.Insecure)) - // The persistency driver - // Load or create 'db' - model, err := loadOrCreate(c.File) + storage, err := getStorageManager(c) if err != nil { - err = errors.Wrap(err, "error loading the file containing the transfers") return nil, err } - pDriver := &pDriver{ - model: model, - } return &rclone{ config: c, client: client, - pDriver: pDriver, + storage: storage, }, nil } @@ -165,56 +132,11 @@ func parseConfig(m map[string]interface{}) (*config, error) { return c, nil } -func loadOrCreate(file string) (*transferModel, error) { - _, err := os.Stat(file) - if os.IsNotExist(err) { - if err := os.WriteFile(file, []byte("{}"), 0700); err != nil { - err = errors.Wrap(err, "error creating the transfers storage file: "+file) - return nil, err - } - } - - fd, err := os.OpenFile(file, os.O_CREATE, 0644) - if err != nil { - err = errors.Wrap(err, "error opening the transfers storage file: "+file) - return nil, err - } - defer fd.Close() - - data, err := io.ReadAll(fd) - if err != nil { - err = errors.Wrap(err, "error reading the data") - return nil, err - } - - model := &transferModel{} - if err := json.Unmarshal(data, model); err != nil { - err = errors.Wrap(err, "error decoding transfers data to json") - return nil, err - } - - if model.Transfers == nil { - model.Transfers = make(map[string]*transfer) - } - - model.File = file - return model, nil -} - -// saveTransfer saves the transfer. If an error is specified than that error will be returned, possibly wrapped with additional errors. -func (m *transferModel) saveTransfer(e error) error { - data, err := json.Marshal(m) - if err != nil { - e = errors.Wrap(err, "error encoding transfer data to json") - return e - } - - if err := os.WriteFile(m.File, data, 0644); err != nil { - e = errors.Wrap(err, "error writing transfer data to file: "+m.File) - return e +func getStorageManager(c *config) (repository.Repository, error) { + if f, ok := repoRegistry.NewFuncs[c.StorageDriver]; ok { + return f(c.StorageDrivers[c.StorageDriver]) } - - return e + return nil, errtypes.NotFound("rclone service: storage driver not found: " + c.StorageDriver) } // CreateTransfer creates a transfer job and returns a TxInfo object that includes a unique transfer id. @@ -244,50 +166,54 @@ func (driver *rclone) CreateTransfer(ctx context.Context, srcTargetURI string, d func (driver *rclone) startJob(ctx context.Context, transferID string, srcRemote string, srcPath string, srcToken string, destRemote string, destPath string, destToken string) (*datatx.TxInfo, error) { logger := appctx.GetLogger(ctx) - driver.pDriver.Lock() - defer driver.pDriver.Unlock() - var txID string var cTime *typespb.Timestamp if transferID == "" { txID = uuid.New().String() cTime = &typespb.Timestamp{Seconds: uint64(time.Now().Unix())} - } else { // restart existing transfer if transferID is specified - logger.Debug().Msgf("Restarting transfer (txID: %s)", transferID) + } else { // restart existing transfer job if transferID is specified + logger.Debug().Msgf("Restarting transfer job (txID: %s)", transferID) txID = transferID - transfer, err := driver.pDriver.model.getTransfer(txID) + job, err := driver.storage.GetJob(txID) if err != nil { - err = errors.Wrap(err, "rclone: error retrying transfer (transferID: "+txID+")") + err = errors.Wrap(err, "rclone: error retrying transfer job (transferID: "+txID+")") return &datatx.TxInfo{ Id: &datatx.TxId{OpaqueId: txID}, Status: datatx.Status_STATUS_INVALID, Ctime: nil, }, err } - seconds, _ := strconv.ParseInt(transfer.Ctime, 10, 64) + seconds, _ := strconv.ParseInt(job.Ctime, 10, 64) cTime = &typespb.Timestamp{Seconds: uint64(seconds)} - _, endStatusFound := txEndStatuses[transfer.TransferStatus.String()] + _, endStatusFound := txEndStatuses[job.TransferStatus.String()] if !endStatusFound { - err := errors.New("rclone: transfer still running, unable to restart") + err := errors.New("rclone: job still running, unable to restart") + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: txID}, + Status: job.TransferStatus, + Ctime: cTime, + }, err + } + srcToken = job.SrcToken + srcRemote = job.SrcRemote + srcPath = job.SrcPath + destToken = job.DestToken + destRemote = job.DestRemote + destPath = job.DestPath + if err := driver.storage.DeleteJob(job); err != nil { + err = errors.Wrap(err, "rclone: transfer still running, unable to restart") return &datatx.TxInfo{ Id: &datatx.TxId{OpaqueId: txID}, - Status: transfer.TransferStatus, + Status: job.TransferStatus, Ctime: cTime, }, err } - srcToken = transfer.SrcToken - srcRemote = transfer.SrcRemote - srcPath = transfer.SrcPath - destToken = transfer.DestToken - destRemote = transfer.DestRemote - destPath = transfer.DestPath - delete(driver.pDriver.model.Transfers, txID) } transferStatus := datatx.Status_STATUS_TRANSFER_NEW - transfer := &transfer{ + job := &repository.Job{ TransferID: txID, JobID: int64(-1), TransferStatus: transferStatus, @@ -300,8 +226,6 @@ func (driver *rclone) startJob(ctx context.Context, transferID string, srcRemote Ctime: fmt.Sprint(cTime.Seconds), // TODO do we need nanos here? } - driver.pDriver.model.Transfers[txID] = transfer - type rcloneAsyncReqJSON struct { SrcFs string `json:"srcFs"` DstFs string `json:"dstFs"` @@ -325,70 +249,94 @@ func (driver *rclone) startJob(ctx context.Context, transferID string, srcRemote } data, err := json.Marshal(rcloneReq) if err != nil { - err = errors.Wrap(err, "rclone: error pulling transfer: error marshalling rclone req data") - transfer.TransferStatus = datatx.Status_STATUS_INVALID + err = errors.Wrap(err, "rclone: transfer job error: error marshalling rclone req data") + job.TransferStatus = datatx.Status_STATUS_INVALID + var e error + if e = driver.storage.StoreJob(job); e != nil { + e = errors.Wrap(e, err.Error()) + } return &datatx.TxInfo{ Id: &datatx.TxId{OpaqueId: txID}, - Status: datatx.Status_STATUS_INVALID, + Status: job.TransferStatus, Ctime: cTime, - }, driver.pDriver.model.saveTransfer(err) + }, e } transferFileMethod := "/sync/copy" remotePathIsFolder, err := driver.remotePathIsFolder(srcRemote, srcPath, srcToken) if err != nil { - err = errors.Wrap(err, "rclone: error pulling transfer: error stating src path") - transfer.TransferStatus = datatx.Status_STATUS_INVALID + err = errors.Wrap(err, "rclone: transfer job error: error stating src path") + job.TransferStatus = datatx.Status_STATUS_INVALID + var e error + if e = driver.storage.StoreJob(job); e != nil { + e = errors.Wrap(e, err.Error()) + } return &datatx.TxInfo{ Id: &datatx.TxId{OpaqueId: txID}, - Status: datatx.Status_STATUS_INVALID, + Status: job.TransferStatus, Ctime: cTime, - }, driver.pDriver.model.saveTransfer(err) + }, e } if !remotePathIsFolder { - err = errors.Wrap(err, "rclone: error pulling transfer: path is a file, only folder transfer is implemented") - transfer.TransferStatus = datatx.Status_STATUS_INVALID + err = errors.Wrap(err, "rclone: transfer job error: path is a file, only folder transfer is implemented") + job.TransferStatus = datatx.Status_STATUS_INVALID + var e error + if e = driver.storage.StoreJob(job); e != nil { + e = errors.Wrap(e, err.Error()) + } return &datatx.TxInfo{ Id: &datatx.TxId{OpaqueId: txID}, - Status: datatx.Status_STATUS_INVALID, + Status: job.TransferStatus, Ctime: cTime, - }, driver.pDriver.model.saveTransfer(err) + }, e } u, err := url.Parse(driver.config.Endpoint) if err != nil { - err = errors.Wrap(err, "rclone: error pulling transfer: error parsing driver endpoint") - transfer.TransferStatus = datatx.Status_STATUS_INVALID + err = errors.Wrap(err, "rclone: transfer job error: error parsing driver endpoint") + job.TransferStatus = datatx.Status_STATUS_INVALID + var e error + if e = driver.storage.StoreJob(job); e != nil { + e = errors.Wrap(e, err.Error()) + } return &datatx.TxInfo{ Id: &datatx.TxId{OpaqueId: txID}, - Status: datatx.Status_STATUS_INVALID, + Status: job.TransferStatus, Ctime: cTime, - }, driver.pDriver.model.saveTransfer(err) + }, e } u.Path = path.Join(u.Path, transferFileMethod) requestURL := u.String() req, err := http.NewRequest(http.MethodPost, requestURL, bytes.NewReader(data)) if err != nil { - err = errors.Wrap(err, "rclone: error pulling transfer: error framing post request") - transfer.TransferStatus = datatx.Status_STATUS_TRANSFER_FAILED + err = errors.Wrap(err, "rclone: transfer job error: error framing post request") + job.TransferStatus = datatx.Status_STATUS_TRANSFER_FAILED + var e error + if e = driver.storage.StoreJob(job); e != nil { + e = errors.Wrap(e, err.Error()) + } return &datatx.TxInfo{ Id: &datatx.TxId{OpaqueId: txID}, - Status: transfer.TransferStatus, + Status: job.TransferStatus, Ctime: cTime, - }, driver.pDriver.model.saveTransfer(err) + }, e } req.Header.Set("Content-Type", "application/json") req.SetBasicAuth(driver.config.AuthUser, driver.config.AuthPass) res, err := driver.client.Do(req) if err != nil { - err = errors.Wrap(err, "rclone: error pulling transfer: error sending post request") - transfer.TransferStatus = datatx.Status_STATUS_TRANSFER_FAILED + err = errors.Wrap(err, "rclone: transfer job error: error sending post request") + job.TransferStatus = datatx.Status_STATUS_TRANSFER_FAILED + var e error + if e = driver.storage.StoreJob(job); e != nil { + e = errors.Wrap(e, err.Error()) + } return &datatx.TxInfo{ Id: &datatx.TxId{OpaqueId: txID}, - Status: transfer.TransferStatus, + Status: job.TransferStatus, Ctime: cTime, - }, driver.pDriver.model.saveTransfer(err) + }, e } defer res.Body.Close() @@ -396,21 +344,29 @@ func (driver *rclone) startJob(ctx context.Context, transferID string, srcRemote if res.StatusCode != http.StatusOK { var errorResData rcloneHTTPErrorRes if err = json.NewDecoder(res.Body).Decode(&errorResData); err != nil { - err = errors.Wrap(err, "rclone driver: error decoding response data") - transfer.TransferStatus = datatx.Status_STATUS_TRANSFER_FAILED + err = errors.Wrap(err, "rclone driver: error decoding rclone response data") + job.TransferStatus = datatx.Status_STATUS_TRANSFER_FAILED + var e error + if e = driver.storage.StoreJob(job); e != nil { + e = errors.Wrap(e, err.Error()) + } return &datatx.TxInfo{ Id: &datatx.TxId{OpaqueId: txID}, - Status: transfer.TransferStatus, + Status: job.TransferStatus, Ctime: cTime, - }, driver.pDriver.model.saveTransfer(err) + }, e + } + err := errors.New("rclone driver: rclone request responded with error, " + fmt.Sprintf(" status: %v, error: %v", errorResData.Status, errorResData.Error)) + job.TransferStatus = datatx.Status_STATUS_TRANSFER_FAILED + var e error + if e = driver.storage.StoreJob(job); e != nil { + e = errors.Wrap(e, err.Error()) } - e := errors.New("rclone: rclone request responded with error, " + fmt.Sprintf(" status: %v, error: %v", errorResData.Status, errorResData.Error)) - transfer.TransferStatus = datatx.Status_STATUS_TRANSFER_FAILED return &datatx.TxInfo{ Id: &datatx.TxId{OpaqueId: txID}, - Status: transfer.TransferStatus, + Status: job.TransferStatus, Ctime: cTime, - }, driver.pDriver.model.saveTransfer(e) + }, e } type rcloneAsyncResJSON struct { @@ -418,19 +374,33 @@ func (driver *rclone) startJob(ctx context.Context, transferID string, srcRemote } var resData rcloneAsyncResJSON if err = json.NewDecoder(res.Body).Decode(&resData); err != nil { - err = errors.Wrap(err, "rclone: error decoding response data") - transfer.TransferStatus = datatx.Status_STATUS_TRANSFER_FAILED + err = errors.Wrap(err, "rclone driver: error decoding response data") + job.TransferStatus = datatx.Status_STATUS_TRANSFER_FAILED + var e error + if e = driver.storage.StoreJob(job); e != nil { + e = errors.Wrap(e, err.Error()) + } return &datatx.TxInfo{ Id: &datatx.TxId{OpaqueId: txID}, - Status: transfer.TransferStatus, + Status: job.TransferStatus, Ctime: cTime, - }, driver.pDriver.model.saveTransfer(err) + }, e } - transfer.JobID = resData.JobID + job.JobID = resData.JobID + + if err := driver.storage.StoreJob(job); err != nil { + err = errors.Wrap(err, "rclone driver: transfer job error") + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: txID}, + Status: datatx.Status_STATUS_INVALID, + Ctime: cTime, + }, err + } - if err := driver.pDriver.model.saveTransfer(nil); err != nil { - err = errors.Wrap(err, "rclone: error pulling transfer") + // the initial save when everything went ok + if err := driver.storage.StoreJob(job); err != nil { + err = errors.Wrap(err, "rclone driver: error starting transfer job") return &datatx.TxInfo{ Id: &datatx.TxId{OpaqueId: txID}, Status: datatx.Status_STATUS_INVALID, @@ -444,22 +414,17 @@ func (driver *rclone) startJob(ctx context.Context, transferID string, srcRemote startTimeMs := time.Now().Nanosecond() / 1000 timeout := driver.config.JobTimeout - driver.pDriver.Lock() - defer driver.pDriver.Unlock() - for { - transfer, err := driver.pDriver.model.getTransfer(txID) + job, err := driver.storage.GetJob(txID) if err != nil { - transfer.TransferStatus = datatx.Status_STATUS_INVALID - err = driver.pDriver.model.saveTransfer(err) - logger.Error().Err(err).Msgf("rclone driver: unable to retrieve transfer with id: %v", txID) + logger.Error().Err(err).Msgf("rclone driver: unable to retrieve transfer job with id: %v", txID) break } // check for end status first - _, endStatusFound := txEndStatuses[transfer.TransferStatus.String()] - if endStatusFound { - logger.Info().Msgf("rclone driver: transfer endstatus reached: %v", transfer.TransferStatus) + _, endStatusreached := txEndStatuses[job.TransferStatus.String()] + if endStatusreached { + logger.Info().Msgf("rclone driver: transfer job endstatus reached: %v", job.TransferStatus) break } @@ -468,16 +433,16 @@ func (driver *rclone) startJob(ctx context.Context, transferID string, srcRemote timePastMs := currentTimeMs - startTimeMs if timePastMs > timeout { - logger.Info().Msgf("rclone driver: transfer timed out: %vms (timeout = %v)", timePastMs, timeout) + logger.Info().Msgf("rclone driver: transfer job timed out: %vms (timeout = %v)", timePastMs, timeout) // set status to EXPIRED and save - transfer.TransferStatus = datatx.Status_STATUS_TRANSFER_EXPIRED - if err := driver.pDriver.model.saveTransfer(nil); err != nil { - logger.Error().Err(err).Msgf("rclone driver: save transfer failed: %v", err) + job.TransferStatus = datatx.Status_STATUS_TRANSFER_EXPIRED + if err := driver.storage.StoreJob(job); err != nil { + logger.Error().Err(err).Msgf("rclone driver: save transfer job failed: %v", err) } break } - jobID := transfer.JobID + jobID := job.JobID type rcloneStatusReqJSON struct { JobID int64 `json:"jobid"` } @@ -488,9 +453,9 @@ func (driver *rclone) startJob(ctx context.Context, transferID string, srcRemote data, err := json.Marshal(rcloneStatusReq) if err != nil { logger.Error().Err(err).Msgf("rclone driver: marshalling request failed: %v", err) - transfer.TransferStatus = datatx.Status_STATUS_INVALID - if err := driver.pDriver.model.saveTransfer(nil); err != nil { - logger.Error().Err(err).Msgf("rclone driver: save transfer failed: %v", err) + job.TransferStatus = datatx.Status_STATUS_INVALID + if err := driver.storage.StoreJob(job); err != nil { + logger.Error().Err(err).Msgf("rclone driver: save transfer job failed: %v", err) } break } @@ -500,9 +465,9 @@ func (driver *rclone) startJob(ctx context.Context, transferID string, srcRemote u, err := url.Parse(driver.config.Endpoint) if err != nil { logger.Error().Err(err).Msgf("rclone driver: could not parse driver endpoint: %v", err) - transfer.TransferStatus = datatx.Status_STATUS_INVALID - if err := driver.pDriver.model.saveTransfer(nil); err != nil { - logger.Error().Err(err).Msgf("rclone driver: save transfer failed: %v", err) + job.TransferStatus = datatx.Status_STATUS_INVALID + if err := driver.storage.StoreJob(job); err != nil { + logger.Error().Err(err).Msgf("rclone driver: save transfer job failed: %v", err) } break } @@ -512,9 +477,9 @@ func (driver *rclone) startJob(ctx context.Context, transferID string, srcRemote req, err := http.NewRequest(http.MethodPost, requestURL, bytes.NewReader(data)) if err != nil { logger.Error().Err(err).Msgf("rclone driver: error framing post request: %v", err) - transfer.TransferStatus = datatx.Status_STATUS_INVALID - if err := driver.pDriver.model.saveTransfer(nil); err != nil { - logger.Error().Err(err).Msgf("rclone driver: save transfer failed: %v", err) + job.TransferStatus = datatx.Status_STATUS_INVALID + if err := driver.storage.StoreJob(job); err != nil { + logger.Error().Err(err).Msgf("rclone driver: save transfer job failed: %v", err) } break } @@ -523,9 +488,9 @@ func (driver *rclone) startJob(ctx context.Context, transferID string, srcRemote res, err := driver.client.Do(req) if err != nil { logger.Error().Err(err).Msgf("rclone driver: error sending post request: %v", err) - transfer.TransferStatus = datatx.Status_STATUS_INVALID - if err := driver.pDriver.model.saveTransfer(nil); err != nil { - logger.Error().Err(err).Msgf("rclone driver: save transfer failed: %v", err) + job.TransferStatus = datatx.Status_STATUS_INVALID + if err := driver.storage.StoreJob(job); err != nil { + logger.Error().Err(err).Msgf("rclone driver: save transfer job failed: %v", err) } break } @@ -539,9 +504,9 @@ func (driver *rclone) startJob(ctx context.Context, transferID string, srcRemote logger.Error().Err(err).Msgf("rclone driver: error reading response body: %v", err) } logger.Error().Err(err).Msgf("rclone driver: rclone request responded with error, status: %v, error: %v", errorResData.Status, errorResData.Error) - transfer.TransferStatus = datatx.Status_STATUS_INVALID - if err := driver.pDriver.model.saveTransfer(nil); err != nil { - logger.Error().Err(err).Msgf("rclone driver: save transfer failed: %v", err) + job.TransferStatus = datatx.Status_STATUS_INVALID + if err := driver.storage.StoreJob(job); err != nil { + logger.Error().Err(err).Msgf("rclone driver: save transfer job failed: %v", err) } break } @@ -566,9 +531,9 @@ func (driver *rclone) startJob(ctx context.Context, transferID string, srcRemote if resData.Error != "" { logger.Error().Err(err).Msgf("rclone driver: rclone responded with error: %v", resData.Error) - transfer.TransferStatus = datatx.Status_STATUS_TRANSFER_FAILED - if err := driver.pDriver.model.saveTransfer(nil); err != nil { - logger.Error().Err(err).Msgf("rclone driver: error saving transfer: %v", err) + job.TransferStatus = datatx.Status_STATUS_TRANSFER_FAILED + if err := driver.storage.StoreJob(job); err != nil { + logger.Error().Err(err).Msgf("rclone driver: error saving transfer job: %v", err) break } break @@ -577,9 +542,9 @@ func (driver *rclone) startJob(ctx context.Context, transferID string, srcRemote // transfer complete if resData.Finished && resData.Success { logger.Info().Msg("rclone driver: transfer job finished") - transfer.TransferStatus = datatx.Status_STATUS_TRANSFER_COMPLETE - if err := driver.pDriver.model.saveTransfer(nil); err != nil { - logger.Error().Err(err).Msgf("rclone driver: error saving transfer: %v", err) + job.TransferStatus = datatx.Status_STATUS_TRANSFER_COMPLETE + if err := driver.storage.StoreJob(job); err != nil { + logger.Error().Err(err).Msgf("rclone driver: error saving transfer job: %v", err) break } break @@ -588,9 +553,9 @@ func (driver *rclone) startJob(ctx context.Context, transferID string, srcRemote // transfer completed unsuccessfully without error if resData.Finished && !resData.Success { logger.Info().Msgf("rclone driver: transfer job failed") - transfer.TransferStatus = datatx.Status_STATUS_TRANSFER_FAILED - if err := driver.pDriver.model.saveTransfer(nil); err != nil { - logger.Error().Err(err).Msgf("rclone driver: error saving transfer: %v", err) + job.TransferStatus = datatx.Status_STATUS_TRANSFER_FAILED + if err := driver.storage.StoreJob(job); err != nil { + logger.Error().Err(err).Msgf("rclone driver: error saving transfer job: %v", err) break } break @@ -599,9 +564,9 @@ func (driver *rclone) startJob(ctx context.Context, transferID string, srcRemote // transfer not yet finished: continue if !resData.Finished { logger.Info().Msgf("rclone driver: transfer job in progress") - transfer.TransferStatus = datatx.Status_STATUS_TRANSFER_IN_PROGRESS - if err := driver.pDriver.model.saveTransfer(nil); err != nil { - logger.Error().Err(err).Msgf("rclone driver: error saving transfer: %v", err) + job.TransferStatus = datatx.Status_STATUS_TRANSFER_IN_PROGRESS + if err := driver.storage.StoreJob(job); err != nil { + logger.Error().Err(err).Msgf("rclone driver: error saving transfer job: %v", err) break } } @@ -619,7 +584,7 @@ func (driver *rclone) startJob(ctx context.Context, transferID string, srcRemote // GetTransferStatus returns the status of the transfer with the specified job id. func (driver *rclone) GetTransferStatus(ctx context.Context, transferID string) (*datatx.TxInfo, error) { - transfer, err := driver.pDriver.model.getTransfer(transferID) + job, err := driver.storage.GetJob(transferID) if err != nil { return &datatx.TxInfo{ Id: &datatx.TxId{OpaqueId: transferID}, @@ -627,17 +592,17 @@ func (driver *rclone) GetTransferStatus(ctx context.Context, transferID string) Ctime: nil, }, err } - cTime, _ := strconv.ParseInt(transfer.Ctime, 10, 64) + cTime, _ := strconv.ParseInt(job.Ctime, 10, 64) return &datatx.TxInfo{ Id: &datatx.TxId{OpaqueId: transferID}, - Status: transfer.TransferStatus, + Status: job.TransferStatus, Ctime: &typespb.Timestamp{Seconds: uint64(cTime)}, }, nil } // CancelTransfer cancels the transfer with the specified transfer id. func (driver *rclone) CancelTransfer(ctx context.Context, transferID string) (*datatx.TxInfo, error) { - transfer, err := driver.pDriver.model.getTransfer(transferID) + job, err := driver.storage.GetJob(transferID) if err != nil { return &datatx.TxInfo{ Id: &datatx.TxId{OpaqueId: transferID}, @@ -646,11 +611,11 @@ func (driver *rclone) CancelTransfer(ctx context.Context, transferID string) (*d }, err } - cTime, _ := strconv.ParseInt(transfer.Ctime, 10, 64) + cTime, _ := strconv.ParseInt(job.Ctime, 10, 64) + // rclone cancel may fail so remove job from model first to be sure transferRemovedMessage := "" if driver.config.RemoveTransferJobOnCancel { - delete(driver.pDriver.model.Transfers, transfer.TransferID) - if err := driver.pDriver.model.saveTransfer(nil); err != nil { + if err := driver.storage.DeleteJob(job); err != nil { return &datatx.TxInfo{ Id: &datatx.TxId{OpaqueId: transferID}, Status: datatx.Status_STATUS_INVALID, @@ -660,9 +625,9 @@ func (driver *rclone) CancelTransfer(ctx context.Context, transferID string) (*d transferRemovedMessage = "(transfer job successfully removed)" } - _, endStatusFound := txEndStatuses[transfer.TransferStatus.String()] + _, endStatusFound := txEndStatuses[job.TransferStatus.String()] if endStatusFound { - err := errors.Wrapf(errors.New("rclone driver: transfer already in end state"), transferRemovedMessage) + err := errors.Wrapf(errors.New("rclone driver: job already in end state"), transferRemovedMessage) return &datatx.TxInfo{ Id: &datatx.TxId{OpaqueId: transferID}, Status: datatx.Status_STATUS_INVALID, @@ -675,12 +640,12 @@ func (driver *rclone) CancelTransfer(ctx context.Context, transferID string) (*d JobID int64 `json:"jobid"` } rcloneCancelTransferReq := &rcloneStopRequest{ - JobID: transfer.JobID, + JobID: job.JobID, } data, err := json.Marshal(rcloneCancelTransferReq) if err != nil { - err := errors.Wrapf(errors.New("rclone driver: error marshalling rclone req data"), transferRemovedMessage) + err := errors.Wrapf(errors.New("rclone driver: error marshalling rclone job/stop req data"), transferRemovedMessage) return &datatx.TxInfo{ Id: &datatx.TxId{OpaqueId: transferID}, Status: datatx.Status_STATUS_INVALID, @@ -775,13 +740,16 @@ func (driver *rclone) CancelTransfer(ctx context.Context, transferID string) (*d }, errors.New(resData.Error) } - transfer.TransferStatus = datatx.Status_STATUS_TRANSFER_CANCELLED - if err := driver.pDriver.model.saveTransfer(nil); err != nil { - return &datatx.TxInfo{ - Id: &datatx.TxId{OpaqueId: transferID}, - Status: datatx.Status_STATUS_INVALID, - Ctime: &typespb.Timestamp{Seconds: uint64(cTime)}, - }, err + // only update when job's not removed + if !driver.config.RemoveTransferJobOnCancel { + job.TransferStatus = datatx.Status_STATUS_TRANSFER_CANCELLED + if err := driver.storage.StoreJob(job); err != nil { + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: transferID}, + Status: datatx.Status_STATUS_INVALID, + Ctime: &typespb.Timestamp{Seconds: uint64(cTime)}, + }, err + } } return &datatx.TxInfo{ @@ -797,15 +765,6 @@ func (driver *rclone) RetryTransfer(ctx context.Context, transferID string) (*da return driver.startJob(ctx, transferID, "", "", "", "", "", "") } -// getTransfer returns the transfer with the specified transfer ID. -func (m *transferModel) getTransfer(transferID string) (*transfer, error) { - transfer, ok := m.Transfers[transferID] - if !ok { - return nil, errors.New("rclone driver: invalid transfer ID") - } - return transfer, nil -} - func (driver *rclone) remotePathIsFolder(remote string, remotePath string, remoteToken string) (bool, error) { type rcloneListReqJSON struct { Fs string `json:"fs"` diff --git a/pkg/datatx/manager/rclone/repository/json/json.go b/pkg/datatx/manager/rclone/repository/json/json.go new file mode 100644 index 0000000000..2d31f70ff1 --- /dev/null +++ b/pkg/datatx/manager/rclone/repository/json/json.go @@ -0,0 +1,171 @@ +// Copyright 2018-2023 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package json + +import ( + "encoding/json" + "io" + "os" + "sync" + + "github.com/cs3org/reva/pkg/datatx/manager/rclone/repository" + "github.com/cs3org/reva/pkg/datatx/manager/rclone/repository/registry" + "github.com/mitchellh/mapstructure" + "github.com/pkg/errors" +) + +func init() { + registry.Register("json", New) +} + +type config struct { + File string `mapstructure:"file"` +} + +type mgr struct { + config *config + sync.Mutex // concurrent access to the file + model *rcloneJobsModel +} + +type rcloneJobsModel struct { + RcloneJobs map[string]*repository.Job `json:"rcloneJobs"` +} + +func parseConfig(m map[string]interface{}) (*config, error) { + c := &config{} + if err := mapstructure.Decode(m, c); err != nil { + err = errors.Wrap(err, "rclone repository json driver: error decoding configuration") + return nil, err + } + return c, nil +} + +func (c *config) init() { + if c.File == "" { + c.File = "/var/tmp/reva/transfer-jobs.json" + } +} + +// New returns a json storage driver. +func New(m map[string]interface{}) (repository.Repository, error) { + c, err := parseConfig(m) + if err != nil { + return nil, err + } + c.init() + + model, err := loadOrCreate(c.File) + if err != nil { + err = errors.Wrap(err, "rclone repository json driver: error loading the file containing the transfer shares") + return nil, err + } + + mgr := &mgr{ + config: c, + model: model, + } + + return mgr, nil +} + +func (m *mgr) StoreJob(job *repository.Job) error { + m.Lock() + defer m.Unlock() + + m.model.RcloneJobs[job.TransferID] = job + err := m.saveModel() + if err != nil { + return errors.Wrap(err, "error storing jobs") + } + + return nil +} + +func (m *mgr) GetJob(transferID string) (*repository.Job, error) { + m.Lock() + defer m.Unlock() + + job, ok := m.model.RcloneJobs[transferID] + if !ok { + return nil, errors.New("rclone repository json driver: error getting job: not found") + } + return job, nil +} + +func (m *mgr) DeleteJob(job *repository.Job) error { + m.Lock() + defer m.Unlock() + + delete(m.model.RcloneJobs, job.TransferID) + if err := m.saveModel(); err != nil { + return errors.New("rclone repository json driver: error deleting job: error updating model") + } + return nil +} + +func (m *mgr) saveModel() error { + data, err := json.Marshal(m.model) + if err != nil { + err = errors.Wrap(err, "rclone repository json driver: error encoding job data to json") + return err + } + + if err := os.WriteFile(m.config.File, data, 0644); err != nil { + err = errors.Wrap(err, "rclone repository json driver: error writing job data to file: "+m.config.File) + return err + } + + return nil +} + +func loadOrCreate(file string) (*rcloneJobsModel, error) { + _, err := os.Stat(file) + if os.IsNotExist(err) { + if err := os.WriteFile(file, []byte("{}"), 0700); err != nil { + err = errors.Wrap(err, "rclone repository json driver: error creating the jobs storage file: "+file) + return nil, err + } + } + + fd, err := os.OpenFile(file, os.O_CREATE, 0644) + if err != nil { + err = errors.Wrap(err, "rclone repository json driver: error opening the jobs storage file: "+file) + return nil, err + } + defer fd.Close() + + data, err := io.ReadAll(fd) + if err != nil { + err = errors.Wrap(err, "rclone repository json driver: error reading the data") + return nil, err + } + + model := &rcloneJobsModel{} + if err := json.Unmarshal(data, model); err != nil { + err = errors.Wrap(err, "rclone repository json driver: error decoding jobs data to json") + return nil, err + } + + if model.RcloneJobs == nil { + model.RcloneJobs = make(map[string]*repository.Job) + } + + return model, nil +} diff --git a/pkg/datatx/manager/rclone/repository/registry/registry.go b/pkg/datatx/manager/rclone/repository/registry/registry.go new file mode 100644 index 0000000000..113586f3ad --- /dev/null +++ b/pkg/datatx/manager/rclone/repository/registry/registry.go @@ -0,0 +1,36 @@ +// Copyright 2018-2023 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package registry + +import ( + "github.com/cs3org/reva/pkg/datatx/manager/rclone/repository" +) + +// NewFunc is the function that rclone repository implementations +// should register at init time. +type NewFunc func(map[string]interface{}) (repository.Repository, error) + +// NewFuncs is a map containing all the registered datatx backends. +var NewFuncs = map[string]NewFunc{} + +// Register registers a new datatx backend new function. +// Not safe for concurrent use. Safe for use from package init. +func Register(name string, f NewFunc) { + NewFuncs[name] = f +} diff --git a/pkg/datatx/manager/rclone/repository/repository.go b/pkg/datatx/manager/rclone/repository/repository.go new file mode 100644 index 0000000000..fd2bcbe927 --- /dev/null +++ b/pkg/datatx/manager/rclone/repository/repository.go @@ -0,0 +1,44 @@ +// Copyright 2018-2023 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package repository + +import ( + datatx "github.com/cs3org/go-cs3apis/cs3/tx/v1beta1" +) + +// Job represents transfer job. +type Job struct { + TransferID string + JobID int64 + TransferStatus datatx.Status + SrcToken string + SrcRemote string + SrcPath string + DestToken string + DestRemote string + DestPath string + Ctime string +} + +// Repository the interface that any storage driver should implement. +type Repository interface { + StoreJob(job *Job) error + GetJob(transferID string) (*Job, error) + DeleteJob(job *Job) error +} diff --git a/pkg/datatx/repository/json/json.go b/pkg/datatx/repository/json/json.go new file mode 100644 index 0000000000..7fcf18cb78 --- /dev/null +++ b/pkg/datatx/repository/json/json.go @@ -0,0 +1,211 @@ +// Copyright 2018-2023 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package json + +import ( + "encoding/json" + "io" + "os" + "sync" + + userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" + txv1beta "github.com/cs3org/go-cs3apis/cs3/tx/v1beta1" + "github.com/cs3org/reva/pkg/datatx" + "github.com/cs3org/reva/pkg/datatx/repository/registry" + "github.com/mitchellh/mapstructure" + "github.com/pkg/errors" +) + +func init() { + registry.Register("json", New) +} + +type config struct { + File string `mapstructure:"file"` +} + +type mgr struct { + config *config + sync.Mutex // concurrent access to the file + model *transfersModel +} + +type transfersModel struct { + Transfers map[string]*datatx.Transfer `json:"transfers"` +} + +func parseConfig(m map[string]interface{}) (*config, error) { + c := &config{} + if err := mapstructure.Decode(m, c); err != nil { + err = errors.Wrap(err, "datatx repository json driver: error decoding configuration") + return nil, err + } + return c, nil +} + +func (c *config) init() { + if c.File == "" { + c.File = "/var/tmp/reva/datatx-transfers.json" + } +} + +// New returns a json storage driver. +func New(m map[string]interface{}) (datatx.Repository, error) { + c, err := parseConfig(m) + if err != nil { + return nil, err + } + c.init() + + model, err := loadOrCreate(c.File) + if err != nil { + err = errors.Wrap(err, "datatx repository json driver: error loading the file containing the transfer shares") + return nil, err + } + + mgr := &mgr{ + config: c, + model: model, + } + + return mgr, nil +} + +func (m *mgr) StoreTransfer(transfer *datatx.Transfer) error { + m.Lock() + defer m.Unlock() + + m.model.Transfers[transfer.TxID] = transfer + err := m.saveModel() + if err != nil { + return errors.Wrap(err, "error storing transfer") + } + + return nil +} + +func (m *mgr) DeleteTransfer(transfer *datatx.Transfer) error { + m.Lock() + defer m.Unlock() + + delete(m.model.Transfers, transfer.TxID) + if err := m.saveModel(); err != nil { + return errors.New("datatx repository json driver: error deleting transfer: error updating model") + } + return nil +} + +func (m *mgr) GetTransfer(txID string) (*datatx.Transfer, error) { + m.Lock() + defer m.Unlock() + + transfer, ok := m.model.Transfers[txID] + if !ok { + return nil, errors.New("datatx repository json driver: error getting transfer: not found") + } + return transfer, nil +} + +func (m *mgr) ListTransfers(filters []*txv1beta.ListTransfersRequest_Filter, userID *userv1beta1.UserId) ([]*datatx.Transfer, error) { + m.Lock() + defer m.Unlock() + + var transfers []*datatx.Transfer + if userID == nil { + return transfers, errors.New("datatx repository json driver: error listing transfers, userID must be provided") + } + for _, transfer := range m.model.Transfers { + if transfer.UserID.OpaqueId == userID.OpaqueId { + if len(filters) == 0 { + transfers = append(transfers, &datatx.Transfer{ + TxID: transfer.TxID, + SrcTargetURI: transfer.SrcTargetURI, + DestTargetURI: transfer.DestTargetURI, + ShareID: transfer.ShareID, + UserID: transfer.UserID, + }) + } else { + for _, f := range filters { + if f.Type == txv1beta.ListTransfersRequest_Filter_TYPE_SHARE_ID { + if f.GetShareId().GetOpaqueId() == transfer.ShareID { + transfers = append(transfers, &datatx.Transfer{ + TxID: transfer.TxID, + SrcTargetURI: transfer.SrcTargetURI, + DestTargetURI: transfer.DestTargetURI, + ShareID: transfer.ShareID, + UserID: transfer.UserID, + }) + } + } + } + } + } + } + return transfers, nil +} + +func (m *mgr) saveModel() error { + data, err := json.Marshal(m.model) + if err != nil { + err = errors.Wrap(err, "datatx repository json driver: error encoding transfer data to json") + return err + } + + if err := os.WriteFile(m.config.File, data, 0644); err != nil { + err = errors.Wrap(err, "datatx repository json driver: error writing transfer data to file: "+m.config.File) + return err + } + + return nil +} + +func loadOrCreate(file string) (*transfersModel, error) { + _, err := os.Stat(file) + if os.IsNotExist(err) { + if err := os.WriteFile(file, []byte("{}"), 0700); err != nil { + err = errors.Wrap(err, "datatx repository json driver: error creating the datatx shares storage file: "+file) + return nil, err + } + } + + fd, err := os.OpenFile(file, os.O_CREATE, 0644) + if err != nil { + err = errors.Wrap(err, "datatx repository json driver: error opening the datatx shares storage file: "+file) + return nil, err + } + defer fd.Close() + + data, err := io.ReadAll(fd) + if err != nil { + err = errors.Wrap(err, "datatx repository json driver: error reading the data") + return nil, err + } + + model := &transfersModel{} + if err := json.Unmarshal(data, model); err != nil { + err = errors.Wrap(err, "datatx repository json driver: error decoding datatx shares data to json") + return nil, err + } + + if model.Transfers == nil { + model.Transfers = make(map[string]*datatx.Transfer) + } + + return model, nil +} diff --git a/pkg/datatx/repository/registry/registry.go b/pkg/datatx/repository/registry/registry.go new file mode 100644 index 0000000000..66dceb7d6a --- /dev/null +++ b/pkg/datatx/repository/registry/registry.go @@ -0,0 +1,36 @@ +// Copyright 2018-2023 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package registry + +import ( + "github.com/cs3org/reva/pkg/datatx" +) + +// NewFunc is the function that datatx repository implementations +// should register at init time. +type NewFunc func(map[string]interface{}) (datatx.Repository, error) + +// NewFuncs is a map containing all the registered datatx repository backends. +var NewFuncs = map[string]NewFunc{} + +// Register registers a new datatx repository backend new function. +// Not safe for concurrent use. Safe for use from package init. +func Register(name string, f NewFunc) { + NewFuncs[name] = f +}