Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data transfers new ocm impl #3847

Merged
merged 3 commits into from
May 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelog/unreleased/datatx-new-ocm-impl.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Enhancement: Update data transfers for current OCM shares implementation

https://github.com/cs3org/reva/pull/3847
https://github.com/cs3org/reva/issues/3846
28 changes: 28 additions & 0 deletions cmd/reva/ocm-share-update-received.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
ocm "github.com/cs3org/go-cs3apis/cs3/sharing/ocm/v1beta1"
typesv1beta1 "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
"github.com/pkg/errors"
"google.golang.org/protobuf/types/known/fieldmaskpb"
)
Expand All @@ -33,9 +34,11 @@ func ocmShareUpdateReceivedCommand() *command {
cmd.Description = func() string { return "update a received OCM share" }
cmd.Usage = func() string { return "Usage: ocm-share-update-received [-flags] <share_id>" }
state := cmd.String("state", "pending", "the state of the share (pending, accepted or rejected)")
path := cmd.String("path", "", "the destination path of the data transfer (ignored if this is not a transfer type share)")

cmd.ResetFlags = func() {
*state = "pending"
*path = ""
}

cmd.Action = func(w ...io.Writer) error {
Expand Down Expand Up @@ -75,9 +78,25 @@ func ocmShareUpdateReceivedCommand() *command {
}
shareRes.Share.State = shareState

// check if we are dealing with a transfer in case the destination path needs to be set
_, ok := getTransferProtocol(shareRes.Share)
var opaque *typesv1beta1.Opaque
if ok {
// transfer_destination_path is not part of TransferProtocol and is specified as an opaque field
opaque = &typesv1beta1.Opaque{
Map: map[string]*typesv1beta1.OpaqueEntry{
"transfer_destination_path": {
Decoder: "plain",
Value: []byte(*path),
},
},
}
}

shareRequest := &ocm.UpdateReceivedOCMShareRequest{
Share: shareRes.Share,
UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"state"}},
Opaque: opaque,
}

updateRes, err := shareClient.UpdateReceivedOCMShare(ctx, shareRequest)
Expand All @@ -95,6 +114,15 @@ func ocmShareUpdateReceivedCommand() *command {
return cmd
}

func getTransferProtocol(share *ocm.ReceivedShare) (*ocm.TransferProtocol, bool) {
for _, p := range share.Protocols {
if d, ok := p.Term.(*ocm.Protocol_TransferOptions); ok {
return d.TransferOptions, true
}
}
return nil, false
}

func getOCMShareState(state string) ocm.ShareState {
switch state {
case "pending":
Expand Down
86 changes: 47 additions & 39 deletions examples/datatx/datatx.toml
Original file line number Diff line number Diff line change
@@ -1,39 +1,47 @@
# Example data transfer service configuration
[grpc.services.datatx]
# Rclone is the default data transfer driver
txdriver = "rclone"
# The shares,transfers db file (default: /var/tmp/reva/datatx-shares.json)
tx_shares_file = ""
# Base folder of the data transfers (default: /home/DataTransfers)
data_transfers_folder = ""

# Rclone data transfer driver
[grpc.services.datatx.txdrivers.rclone]
# Rclone endpoint
endpoint = "http://..."
# Basic auth is used
auth_user = "...rcloneuser"
auth_pass = "...rcloneusersecret"
# The authentication scheme to use in the src and dest requests by rclone (follows the endpoints' authentication methods)
# Valid values:
# "bearer" (default) will result in rclone using request header: Authorization: "Bearer ...token..."
# "x-access-token" will result in rclone using request header: X-Access-Token: "...token..."
# If not set "bearer" is assumed
auth_header = "x-access-token"
# The transfers(jobs) db file (default: /var/tmp/reva/datatx-transfers.json)
file = ""
# Check status job interval in milliseconds
job_status_check_interval = 2000
# The job timeout in milliseconds (must be long enough for big transfers!)
job_timeout = 120000

[http.services.ocdav]
# Rclone supports third-party copy push; for that to work with reva enable this setting
enable_http_tpc = true
# The authentication scheme reva uses for the tpc push call (the call to Destination).
# Follows the destination endpoint authentication method.
# Valid values:
# "bearer" (default) will result in header: Authorization: "Bearer ...token..."
# "x-access-token" will result in header: X-Access-Token: "...token..."
# If not set "bearer" is assumed
http_tpc_push_auth_header = "x-access-token"
# all relevant settings for data transfers

[grpc.services.gateway]
datatx = "localhost:19000"
# base folder of the data transfers (eg. /home/DataTransfers)
data_transfers_folder = ""


[grpc.services.datatx]
# rclone is currently the only data transfer driver implementation
txdriver = "rclone"
# the shares,transfers db file (default: /var/tmp/reva/datatx-shares.json)
tx_shares_file = ""
# base folder of the data transfers (eg. /home/DataTransfers)
data_transfers_folder = ""

# rclone driver
[grpc.services.datatx.txdrivers.rclone]
# rclone endpoint
endpoint = "http://..."
# Basic auth is used for authenticating with rclone
auth_user = "{rclone user}"
auth_pass = "{rclone user secret}"
# The authentication scheme to use in the src and dest requests by rclone (follows the endpoints' authentication methods)
# Valid values:
# "bearer" (default) will result in rclone using request header: Authorization: "Bearer ...token..."
# "x-access-token" will result in rclone using request header: X-Access-Token: "...token..."
# If not set "bearer" is assumed
auth_header = "x-access-token"
# the transfers(jobs) db file (default: /var/tmp/reva/datatx-transfers.json)
file = ""
# check status job interval in milliseconds
job_status_check_interval = 2000
# the job timeout in milliseconds (must be long enough for big transfers!)
job_timeout = 120000

[http.services.ocdav]
# reva supports http third party copy
enable_http_tpc = true
# with rclone reva only supports http tpc push (ie. with the destination header specified)
# The authentication scheme reva uses for the tpc push call (the call to Destination).
# Follows the destination endpoint authentication method.
# Valid values:
# "bearer" (default) will result in header: Authorization: "Bearer ...token..."
# "x-access-token" will result in header: X-Access-Token: "...token..."
# If not set "bearer" is assumed
http_tpc_push_auth_header = "x-access-token"
25 changes: 17 additions & 8 deletions internal/grpc/services/datatx/datatx.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ type config struct {
TxDriver string `mapstructure:"txdriver"`
TxDrivers map[string]map[string]interface{} `mapstructure:"txdrivers"`
// storage driver to persist share/transfer relation
StorageDriver string `mapstructure:"storage_driver"`
StorageDrivers map[string]map[string]interface{} `mapstructure:"storage_drivers"`
TxSharesFile string `mapstructure:"tx_shares_file"`
DataTransfersFolder string `mapstructure:"data_transfers_folder"`
StorageDriver string `mapstructure:"storage_driver"`
StorageDrivers map[string]map[string]interface{} `mapstructure:"storage_drivers"`
TxSharesFile string `mapstructure:"tx_shares_file"`
RemoveOnCancel bool `mapstructure:"remove_on_cancel"`
}

type service struct {
Expand Down Expand Up @@ -81,9 +81,6 @@ func (c *config) init() {
if c.TxSharesFile == "" {
c.TxSharesFile = "/var/tmp/reva/datatx-shares.json"
}
if c.DataTransfersFolder == "" {
c.DataTransfersFolder = "/home/DataTransfers"
}
}

func (s *service) Register(ss *grpc.Server) {
Expand Down Expand Up @@ -211,10 +208,22 @@ func (s *service) CancelTransfer(ctx context.Context, req *datatx.CancelTransfer
return nil, errtypes.InternalError("datatx service: transfer not found")
}

transferRemovedMessage := ""
if s.conf.RemoveOnCancel {
delete(s.txShareDriver.model.TxShares, req.TxId.GetOpaqueId())
if err := s.txShareDriver.model.saveTxShare(); err != nil {
err = errors.Wrap(err, "datatx service: error deleting transfer: "+datatx.Status_STATUS_INVALID.String())
return &datatx.CancelTransferResponse{
Status: status.NewInvalid(ctx, "error cancelling transfer"),
}, err
}
transferRemovedMessage = "transfer successfully removed"
}

txInfo, err := s.txManager.CancelTransfer(ctx, req.GetTxId().OpaqueId)
if err != nil {
txInfo.ShareId = &ocm.ShareId{OpaqueId: txShare.ShareID}
err = errors.Wrap(err, "datatx service: error cancelling transfer")
err = errors.Wrapf(err, "(%v) datatx service: error cancelling transfer", transferRemovedMessage)
return &datatx.CancelTransferResponse{
Status: status.NewInternal(ctx, err, "error cancelling transfer"),
TxInfo: txInfo,
Expand Down
4 changes: 0 additions & 4 deletions internal/grpc/services/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,6 @@ func (c *config) init() {

c.ShareFolder = strings.Trim(c.ShareFolder, "/")

if c.DataTransfersFolder == "" {
c.DataTransfersFolder = "DataTransfers"
}

if c.TokenManager == "" {
c.TokenManager = "jwt"
}
Expand Down
Loading