Skip to content
This repository has been archived by the owner on May 9, 2024. It is now read-only.

Commit

Permalink
Fix AM transfer deletion
Browse files Browse the repository at this point in the history
The SFTP client uses a `remoteDir` config value that is used as prefix
in the `Upload()` and `Delete()` functions. Using the remote path from
the upload transfer result duplicates that prefix and makes the deletion
to fail with a "file does not exist" error.

Return also the remote relative path from the upload activity and use it
as destination for the delete activity.

Refs artefactual-sdps#785.
  • Loading branch information
jraddaoui committed Dec 5, 2023
1 parent c98ce2c commit 24a2123
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 9 deletions.
13 changes: 9 additions & 4 deletions internal/am/upload_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ type UploadTransferActivityParams struct {

type UploadTransferActivityResult struct {
BytesCopied int64
RemotePath string
// Full path including `remoteDir` config path.
RemoteFullPath string
// Relative path to the `remoteDir` config path.
RemoteRelativePath string
}

type UploadTransferActivity struct {
Expand All @@ -40,13 +43,15 @@ func (a *UploadTransferActivity) Execute(ctx context.Context, params *UploadTran
}
defer src.Close()

bytes, path, err := a.client.Upload(ctx, src, filepath.Base(params.SourcePath))
filename := filepath.Base(params.SourcePath)
bytes, path, err := a.client.Upload(ctx, src, filename)
if err != nil {
return nil, fmt.Errorf("%s: %v", UploadTransferActivityName, err)
}

return &UploadTransferActivityResult{
BytesCopied: bytes,
RemotePath: path,
BytesCopied: bytes,
RemoteFullPath: path,
RemoteRelativePath: filename,
}, nil
}
5 changes: 3 additions & 2 deletions internal/am/upload_transfer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ func TestUploadTransferActivity(t *testing.T) {
).Return(int64(14), "/transfer_dir/"+filename, nil)
},
want: am.UploadTransferActivityResult{
BytesCopied: int64(14),
RemotePath: "/transfer_dir/" + filename,
BytesCopied: int64(14),
RemoteFullPath: "/transfer_dir/" + filename,
RemoteRelativePath: filename,
},
},
{
Expand Down
4 changes: 2 additions & 2 deletions internal/workflow/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ func (w *ProcessingWorkflow) transferAM(sessCtx temporalsdk_workflow.Context, ti
am.StartTransferActivityName,
&am.StartTransferActivityParams{
Name: tinfo.req.Key,
Path: uploadResult.RemotePath,
Path: uploadResult.RemoteFullPath,
},
).Get(activityOpts, &transferResult)
if err != nil {
Expand Down Expand Up @@ -738,7 +738,7 @@ func (w *ProcessingWorkflow) transferAM(sessCtx temporalsdk_workflow.Context, ti
// Delete transfer.
activityOpts = withActivityOptsForRequest(sessCtx)
err = temporalsdk_workflow.ExecuteActivity(activityOpts, am.DeleteTransferActivityName, am.DeleteTransferActivityParams{
Destination: uploadResult.RemotePath,
Destination: uploadResult.RemoteRelativePath,
}).Get(activityOpts, nil)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion internal/workflow/processing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func (s *ProcessingWorkflowTestSuite) TestAMWorkflow() {
).Return(&activities.ZipActivityResult{Path: "/tmp/transfer.zip"}, nil)
s.env.OnActivity(am.UploadTransferActivityName,
sessionCtx, &am.UploadTransferActivityParams{SourcePath: "/tmp/transfer.zip"},
).Return(&am.UploadTransferActivityResult{RemotePath: "transfer.zip"}, nil).Once()
).Return(&am.UploadTransferActivityResult{RemoteFullPath: "transfer.zip", RemoteRelativePath: "transfer.zip"}, nil).Once()
s.env.OnActivity(am.StartTransferActivityName,
sessionCtx, &am.StartTransferActivityParams{Name: key, Path: "transfer.zip"},
).Return(&am.StartTransferActivityResult{TransferID: transferID.String()}, nil).Once()
Expand Down

0 comments on commit 24a2123

Please sign in to comment.