Skip to content
This repository has been archived by the owner on May 9, 2024. It is now read-only.

Commit

Permalink
Send failed SIPs to a different bucket.
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielCosme authored and jraddaoui committed Dec 1, 2023
1 parent 8225f33 commit 01de96f
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 16 deletions.
18 changes: 16 additions & 2 deletions cmd/enduro-am-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func main() {
// Set-up failed transfers bucket.
var failedTransfersBucket *blob.Bucket
{
failedLocation, err := storage.NewInternalLocation(&cfg.Failed)
failedLocation, err := storage.NewInternalLocation(&cfg.FailedTransfers)
if err != nil {
logger.Error(err, "Error setting up failed transfers location.")
os.Exit(1)
Expand All @@ -116,6 +116,20 @@ func main() {
os.Exit(1)
}
}
// Set-up failed sip bucket.
var failedSipsBucket *blob.Bucket
{
failedLocation, err := storage.NewInternalLocation(&cfg.FailedSips)
if err != nil {
logger.Error(err, "Error setting up failed transfers location.")
os.Exit(1)
}
failedSipsBucket, err = failedLocation.OpenBucket(ctx)
if err != nil {
logger.Error(err, "Error getting failed transfers bucket.")
os.Exit(1)
}
}

var g run.Group

Expand Down Expand Up @@ -177,7 +191,7 @@ func main() {
w.RegisterActivityWithOptions(sfa_activities.NewAllowedFileFormatsActivity().Execute, temporalsdk_activity.RegisterOptions{Name: sfa_activities.AllowedFileFormatsName})
w.RegisterActivityWithOptions(sfa_activities.NewMetadataValidationActivity().Execute, temporalsdk_activity.RegisterOptions{Name: sfa_activities.MetadataValidationName})
w.RegisterActivityWithOptions(sfa_activities.NewSipCreationActivity().Execute, temporalsdk_activity.RegisterOptions{Name: sfa_activities.SipCreationName})
w.RegisterActivityWithOptions(sfa_activities.NewSendToFailedBuckeActivity(failedTransfersBucket).Execute, temporalsdk_activity.RegisterOptions{Name: sfa_activities.SendToFailedBucketName})
w.RegisterActivityWithOptions(sfa_activities.NewSendToFailedBuckeActivity(failedTransfersBucket, failedSipsBucket).Execute, temporalsdk_activity.RegisterOptions{Name: sfa_activities.SendToFailedBucketName})
// Archivematica activities
w.RegisterActivityWithOptions(activities.NewZipActivity(logger).Execute, temporalsdk_activity.RegisterOptions{Name: activities.ZipActivityName})

Expand Down
10 changes: 9 additions & 1 deletion enduro.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,22 @@ secret = "minio123"
region = "us-west-1"
bucket = "aips"

[failed]
[failedtransfers]
endpoint = "http://minio.enduro-sdps:9000"
pathStyle = true
key = "minio"
secret = "minio123"
region = "us-west-1"
bucket = "failed-transfers"

[failedsips]
endpoint = "http://minio.enduro-sdps:9000"
pathStyle = true
key = "minio"
secret = "minio123"
region = "us-west-1"
bucket = "failed-sips"

# Change the taskqueue setting to your prefered preservation system, by default it is a3m.
[preservation]
taskqueue = "a3m"
Expand Down
1 change: 1 addition & 0 deletions hack/kube/base/minio-setup-buckets-job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ spec:
"mc alias set enduro http://minio.enduro-sdps:9000 ${MINIO_USER} ${MINIO_PASSWORD};
mc mb enduro/sips --ignore-existing;
mc mb enduro/failed-transfers --ignore-existing;
mc mb enduro/failed-sips --ignore-existing;
mc mb enduro/aips --ignore-existing;
mc mb enduro/perma-aips-1 --ignore-existing;
mc mb enduro/perma-aips-2 --ignore-existing;
Expand Down
1 change: 1 addition & 0 deletions hack/kube/tools/minio-recreate-buckets-job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ spec:
mc rb --force --dangerous enduro;
mc mb enduro/sips --ignore-existing;
mc mb enduro/failed-transfers --ignore-existing;
mc mb enduro/failed-sips --ignore-existing;
mc mb enduro/aips --ignore-existing;
mc mb enduro/perma-aips-1 --ignore-existing;
mc mb enduro/perma-aips-2 --ignore-existing;
Expand Down
3 changes: 2 additions & 1 deletion internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ type Configuration struct {
Upload upload.Config
Watcher watcher.Config

Failed storage.LocationConfig
FailedTransfers storage.LocationConfig
FailedSips storage.LocationConfig
}

func (c Configuration) Validate() error {
Expand Down
27 changes: 19 additions & 8 deletions internal/sfa/activities/SendToFailedBucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,25 @@ import (
)

const SendToFailedBucketName = "send-to-failed-bucket"
const FailureSIP = "sip"
const FailureTransfer = "transfer"

type SendToFailedBucketActivity struct {
failedBucket *blob.Bucket
failedTransferBucket *blob.Bucket
failedSipBucket *blob.Bucket
}

func NewSendToFailedBuckeActivity(bucket *blob.Bucket) *SendToFailedBucketActivity {
return &SendToFailedBucketActivity{failedBucket: bucket}
func NewSendToFailedBuckeActivity(transfer, sip *blob.Bucket) *SendToFailedBucketActivity {
return &SendToFailedBucketActivity{
failedTransferBucket: transfer,
failedSipBucket: sip,
}
}

type SendToFailedBucketParams struct {
Path string
Key string
FailureType string
Path string
Key string
}

type SendToFailedBucketResult struct {
Expand All @@ -34,9 +41,13 @@ func (sf *SendToFailedBucketActivity) Execute(ctx context.Context, params *SendT
}
res.FailedKey = "Failed_" + params.Key

if err := sf.failedBucket.Upload(ctx, res.FailedKey, f, &blob.WriterOptions{
ContentType: "application/octet-stream",
}); err != nil {
switch params.FailureType {
case FailureTransfer:
err = sf.failedTransferBucket.Upload(ctx, res.FailedKey, f, &blob.WriterOptions{ContentType: "application/octet-stream"})
case FailureSIP:
err = sf.failedSipBucket.Upload(ctx, res.FailedKey, f, &blob.WriterOptions{ContentType: "application/octet-stream"})
}
if err != nil {
return nil, err
}

Expand Down
18 changes: 16 additions & 2 deletions internal/workflow/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,8 +378,9 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context
if PreProcessingErr != nil {
var sendToFailedRes sfa_activities.SendToFailedBucketResult
err = temporalsdk_workflow.ExecuteActivity(preProcCtx, sfa_activities.SendToFailedBucketName, &sfa_activities.SendToFailedBucketParams{
Path: tinfo.TempFile,
Key: tinfo.req.Key,
FailureType: sfa_activities.FailureTransfer,
Path: tinfo.TempFile,
Key: tinfo.req.Key,
}).Get(sessCtx, &sendToFailedRes)
if err != nil {
return err
Expand Down Expand Up @@ -734,6 +735,19 @@ func (w *ProcessingWorkflow) transferAM(sessCtx temporalsdk_workflow.Context, ti
if err != nil {
return err
}
//w.cleanUpPath(zipResult.Path) // Delete when workflow completes.

defer func() {
if err != nil {
var sendToFailedRes sfa_activities.SendToFailedBucketResult
bucketErr := temporalsdk_workflow.ExecuteActivity(activityOpts, sfa_activities.SendToFailedBucketName, &sfa_activities.SendToFailedBucketParams{
FailureType: sfa_activities.FailureSIP,
Path: zipResult.Path,
Key: tinfo.req.Key,
}).Get(sessCtx, &sendToFailedRes)
errors.Join(err, bucketErr)
}
}()

// Upload transfer to AMSS.
activityOpts = temporalsdk_workflow.WithActivityOptions(sessCtx,
Expand Down
5 changes: 3 additions & 2 deletions internal/workflow/processing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ func (s *ProcessingWorkflowTestSuite) SetupWorkflowTest(taskQueue string) {
pkgsvc := packagefake.NewMockService(ctrl)
wsvc := watcherfake.NewMockService(ctrl)
sftpc := sftp_fake.NewMockClient(ctrl)
failedBucket := watcherfake.OpenTestFileBucket(s.T())
failedBucketTransfers := watcherfake.OpenTestFileBucket(s.T())
failedBucketSips := watcherfake.OpenTestFileBucket(s.T())

s.env.RegisterActivityWithOptions(activities.NewDownloadActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DownloadActivityName})
s.env.RegisterActivityWithOptions(activities.NewBundleActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.BundleActivityName})
Expand All @@ -70,7 +71,7 @@ func (s *ProcessingWorkflowTestSuite) SetupWorkflowTest(taskQueue string) {
s.env.RegisterActivityWithOptions(sfa_activities.NewAllowedFileFormatsActivity().Execute, temporalsdk_activity.RegisterOptions{Name: sfa_activities.AllowedFileFormatsName})
s.env.RegisterActivityWithOptions(sfa_activities.NewMetadataValidationActivity().Execute, temporalsdk_activity.RegisterOptions{Name: sfa_activities.MetadataValidationName})
s.env.RegisterActivityWithOptions(sfa_activities.NewSipCreationActivity().Execute, temporalsdk_activity.RegisterOptions{Name: sfa_activities.SipCreationName})
s.env.RegisterActivityWithOptions(sfa_activities.NewSendToFailedBuckeActivity(failedBucket).Execute, temporalsdk_activity.RegisterOptions{Name: sfa_activities.SendToFailedBucketName})
s.env.RegisterActivityWithOptions(sfa_activities.NewSendToFailedBuckeActivity(failedBucketTransfers, failedBucketSips).Execute, temporalsdk_activity.RegisterOptions{Name: sfa_activities.SendToFailedBucketName})

// Archivematica activities
s.env.RegisterActivityWithOptions(
Expand Down

0 comments on commit 01de96f

Please sign in to comment.