From 24a212361ab0b701f96b8518992e60d214850327 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Raddaoui=20Mar=C3=ADn?= Date: Tue, 5 Dec 2023 16:40:40 +0100 Subject: [PATCH] Fix AM transfer deletion The SFTP client uses a `remoteDir` config value that is used as prefix in the `Upload()` and `Delete()` functions. Using the remote path from the upload transfer result duplicates that prefix and makes the deletion to fail with a "file does not exist" error. Return also the remote relative path from the upload activity and use it as destination for the delete activity. Refs #785. --- internal/am/upload_transfer.go | 13 +++++++++---- internal/am/upload_transfer_test.go | 5 +++-- internal/workflow/processing.go | 4 ++-- internal/workflow/processing_test.go | 2 +- 4 files changed, 15 insertions(+), 9 deletions(-) diff --git a/internal/am/upload_transfer.go b/internal/am/upload_transfer.go index d1f192c2..a0077a60 100644 --- a/internal/am/upload_transfer.go +++ b/internal/am/upload_transfer.go @@ -19,7 +19,10 @@ type UploadTransferActivityParams struct { type UploadTransferActivityResult struct { BytesCopied int64 - RemotePath string + // Full path including `remoteDir` config path. + RemoteFullPath string + // Relative path to the `remoteDir` config path. + RemoteRelativePath string } type UploadTransferActivity struct { @@ -40,13 +43,15 @@ func (a *UploadTransferActivity) Execute(ctx context.Context, params *UploadTran } defer src.Close() - bytes, path, err := a.client.Upload(ctx, src, filepath.Base(params.SourcePath)) + filename := filepath.Base(params.SourcePath) + bytes, path, err := a.client.Upload(ctx, src, filename) if err != nil { return nil, fmt.Errorf("%s: %v", UploadTransferActivityName, err) } return &UploadTransferActivityResult{ - BytesCopied: bytes, - RemotePath: path, + BytesCopied: bytes, + RemoteFullPath: path, + RemoteRelativePath: filename, }, nil } diff --git a/internal/am/upload_transfer_test.go b/internal/am/upload_transfer_test.go index d55a4e06..a781ec24 100644 --- a/internal/am/upload_transfer_test.go +++ b/internal/am/upload_transfer_test.go @@ -46,8 +46,9 @@ func TestUploadTransferActivity(t *testing.T) { ).Return(int64(14), "/transfer_dir/"+filename, nil) }, want: am.UploadTransferActivityResult{ - BytesCopied: int64(14), - RemotePath: "/transfer_dir/" + filename, + BytesCopied: int64(14), + RemoteFullPath: "/transfer_dir/" + filename, + RemoteRelativePath: filename, }, }, { diff --git a/internal/workflow/processing.go b/internal/workflow/processing.go index a6335e0e..a4ac4f03 100644 --- a/internal/workflow/processing.go +++ b/internal/workflow/processing.go @@ -686,7 +686,7 @@ func (w *ProcessingWorkflow) transferAM(sessCtx temporalsdk_workflow.Context, ti am.StartTransferActivityName, &am.StartTransferActivityParams{ Name: tinfo.req.Key, - Path: uploadResult.RemotePath, + Path: uploadResult.RemoteFullPath, }, ).Get(activityOpts, &transferResult) if err != nil { @@ -738,7 +738,7 @@ func (w *ProcessingWorkflow) transferAM(sessCtx temporalsdk_workflow.Context, ti // Delete transfer. activityOpts = withActivityOptsForRequest(sessCtx) err = temporalsdk_workflow.ExecuteActivity(activityOpts, am.DeleteTransferActivityName, am.DeleteTransferActivityParams{ - Destination: uploadResult.RemotePath, + Destination: uploadResult.RemoteRelativePath, }).Get(activityOpts, nil) if err != nil { return err diff --git a/internal/workflow/processing_test.go b/internal/workflow/processing_test.go index e2654263..c0203f02 100644 --- a/internal/workflow/processing_test.go +++ b/internal/workflow/processing_test.go @@ -245,7 +245,7 @@ func (s *ProcessingWorkflowTestSuite) TestAMWorkflow() { ).Return(&activities.ZipActivityResult{Path: "/tmp/transfer.zip"}, nil) s.env.OnActivity(am.UploadTransferActivityName, sessionCtx, &am.UploadTransferActivityParams{SourcePath: "/tmp/transfer.zip"}, - ).Return(&am.UploadTransferActivityResult{RemotePath: "transfer.zip"}, nil).Once() + ).Return(&am.UploadTransferActivityResult{RemoteFullPath: "transfer.zip", RemoteRelativePath: "transfer.zip"}, nil).Once() s.env.OnActivity(am.StartTransferActivityName, sessionCtx, &am.StartTransferActivityParams{Name: key, Path: "transfer.zip"}, ).Return(&am.StartTransferActivityResult{TransferID: transferID.String()}, nil).Once()