Skip to content

Commit

Permalink
Reduce preservation tasks created from AM jobs
Browse files Browse the repository at this point in the history
Create preservation tasks from only a set of the Archivematica transfer
and ingest jobs. Looking at the initial data and the UUIDs used in the
default `workflow.json` chains from Archivematica source code, define
the map of jobs we want to keep using their `LinkID`.
  • Loading branch information
jraddaoui committed Nov 12, 2024
1 parent cfb6643 commit 92feb35
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 66 deletions.
65 changes: 57 additions & 8 deletions internal/am/job_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,53 @@ import (
"github.com/artefactual-sdps/enduro/internal/package_"
)

var keepJobs = map[string]struct{}{
// Transfer jobs.
"3229e01f-adf3-4294-85f7-4acb01b3fbcf": {}, // Extract zipped bag transfer
"154dd501-a344-45a9-97e3-b30093da35f5": {}, // Rename with transfer UUID
"3c526a07-c3b8-4e53-801b-7f3d0c4857a5": {}, // Assign file UUIDs to objects
"c77fee8c-7c4e-4871-a72e-94d499994869": {}, // Assign checksums and file sizes to objects
"5415c813-3637-49ab-afec-9b435c2e4d2c": {}, // Assign UUIDs to directories
"1c2550f1-3fc0-45d8-8bc4-4c06d720283b": {}, // Scan for viruses in directories
"56eebd45-5600-4768-a8c2-ec0114555a3d": {}, // Generate transfer structure report
"2584b25c-8d98-44b7-beca-2b3ea2ea2505": {}, // Change object and directory filenames
"a329d39b-4711-4231-b54e-b5958934dccb": {}, // Change transfer name
"2522d680-c7d9-4d06-8b11-a28d8bd8a71f": {}, // Identify file format
"1a136608-ae7b-42b4-bf2f-de0e514cfd47": {}, // Load rights
"303a65f6-a16f-4a06-807b-cb3425a30201": {}, // Characterize and extract metadata
"d51a7ed8-9cf4-424b-9671-85fd8b5b95aa": {}, // Load PREMIS events from metadata/premis.xml
"a536828c-be65-4088-80bd-eb511a0a063d": {}, // Validate formats
"307edcde-ad10-401c-92c4-652917c993ed": {}, // Generate METS.xml document
"675acd22-828d-4949-adc7-1888240f5e3d": {}, // Parse external METS
// Ingest jobs.
"15a2df8a-7b45-4c11-b6fa-884c9b7e5c67": {}, // Identify manually normalized files
"5b0042a2-2244-475c-85d5-41e4b11e65d6": {}, // Validate preservation derivatives
"78b7adff-861d-4450-b6dd-3fabe96a849e": {}, // Check for manual normalized files
"47dd6ea6-1ee7-4462-8b84-3fc4c1eeeb7f": {}, // Check for submission documentation
"2a62f025-83ec-4f23-adb4-11d5da7ad8c2": {}, // Assign checksums and file sizes to submissionDocumentation
"11033dbd-e4d4-4dd6-8bcf-48c424e222e3": {}, // Change file and directory names in submission documentation
"1ba589db-88d1-48cf-bb1a-a5f9d2b17378": {}, // Scan for viruses in submission documentation
"1dce8e21-7263-4cc4-aa59-968d9793b5f2": {}, // Identify file format
"33d7ac55-291c-43ae-bb42-f599ef428325": {}, // Characterize and extract metadata on submission documentation
"b0ffcd90-eb26-4caf-8fab-58572d205f04": {}, // Process JSON metadata
"e4b0c713-988a-4606-82ea-4b565936d9a7": {}, // Move metadata to objects directory
"dc9d4991-aefa-4d7e-b7b5-84e3c4336e74": {}, // Assign file UUIDs to metadata
"b6b0fe37-aa26-40bd-8be8-d3acebf3ccf8": {}, // Assign checksums and file sizes to metadata
"b21018df-f67d-469a-9ceb-ac92ac68654e": {}, // Change file and directory names in metadata
"8bc92801-4308-4e3b-885b-1a89fdcd3014": {}, // Scan for viruses in metadata
"b2444a6e-c626-4487-9abc-1556dd89a8ae": {}, // Identify file format of metadata files
"04493ab2-6cad-400d-8832-06941f121a96": {}, // Characterize and extract metadata on metadata files
"873b428f-2c86-42b6-b463-aeda925bf559": {}, // Load persistent identifiers from external file
"ccf8ec5c-3a9a-404a-a7e7-8f567d3b36a0": {}, // Generate METS.xml document
"523c97cc-b267-4cfb-8209-d99e523bf4b3": {}, // Add README file
"3e25bda6-5314-4bb4-aa1e-90900dce887d": {}, // Prepare AIP
"d55b42c8-c7c5-4a40-b626-d248d2bd883f": {}, // Compress AIP
"3f543585-fa4f-4099-9153-dd6d53572f5c": {}, // Verify AIP
"20515483-25ed-4133-b23e-5bb14cab8e22": {}, // Store the AIP
"48703fad-dc44-4c8e-8f47-933df3ef6179": {}, // Index AIP
"b7cf0d9a-504f-4f4e-9930-befa817d67ff": {}, // Clean up after storing AIP
}

var jobStatusToPreservationTaskStatus = map[amclient.JobStatus]enums.PreservationTaskStatus{
amclient.JobStatusUnknown: enums.PreservationTaskStatusUnspecified,
amclient.JobStatusComplete: enums.PreservationTaskStatusDone,
Expand Down Expand Up @@ -82,7 +129,7 @@ func (jt *JobTracker) list(ctx context.Context, unitID string) ([]amclient.Job,
// savePreservationTasks persists Archivematica jobs data as preservation tasks.
func (jt *JobTracker) savePreservationTasks(ctx context.Context, jobs []amclient.Job) (int, error) {
var count int
jobs = filterSavedJobs(jobs, jt.savedIDs)
jobs = jt.filterJobs(jobs)
for _, job := range jobs {
// Wait until a job is complete (or failed) before saving it.
if job.Status == amclient.JobStatusProcessing {
Expand All @@ -105,16 +152,18 @@ func (jt *JobTracker) savePreservationTasks(ctx context.Context, jobs []amclient
return count, nil
}

// filterSavedJobs filters out jobs that have an ID in saved then returns the
// filtered job list.
func filterSavedJobs(jobs []amclient.Job, saved map[string]struct{}) []amclient.Job {
var unsaved []amclient.Job
// filterJobs filters out jobs that have an ID in saved and jobs without
// LinkID in the jobs we want to keep, then returns the filtered job list.
func (jt *JobTracker) filterJobs(jobs []amclient.Job) []amclient.Job {
var filtered []amclient.Job
for _, job := range jobs {
if _, ok := saved[job.ID]; !ok {
unsaved = append(unsaved, job)
_, okSaved := jt.savedIDs[job.ID]
_, okKeep := keepJobs[job.LinkID]
if !okSaved && okKeep {
filtered = append(filtered, job)
}
}
return unsaved
return filtered
}

// ConvertJobToPreservationTask converts an amclient.Job to a
Expand Down
71 changes: 33 additions & 38 deletions internal/am/job_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ func TestJobTracker(t *testing.T) {

paID := 1
unitID := uuid.New().String()
startedAt := time.Date(2024, time.January, 18, 1, 27, 49, 0, time.UTC)
completedAt := time.Date(2024, time.January, 18, 1, 27, 51, 0, time.UTC)

clock := clockwork.NewFakeClock()
httpError := func(m *amclienttest.MockJobsServiceMockRecorder, statusCode int) {
Expand All @@ -45,38 +47,26 @@ func TestJobTracker(t *testing.T) {

jobs := []amclient.Job{
{
ID: "f60018ac-da79-4769-9509-c6c41d5efe7e",
LinkID: "70669a5b-01e4-4ea0-ac70-10292f87da05",
Microservice: "Verify SIP compliance",
Name: "Move to processing directory",
Status: amclient.JobStatusComplete,
ID: "f60018ac-da79-4769-9509-c6c41d5efe7e",
LinkID: "3229e01f-adf3-4294-85f7-4acb01b3fbcf",
Name: "Extract zipped bag transfer",
Status: amclient.JobStatusComplete,
Tasks: []amclient.Task{
{
ID: "c134198c-9485-4f68-8d94-4da1e03b5e1b",
ExitCode: 0,
CreatedAt: amclient.TaskDateTime{Time: time.Date(2024, time.January, 18, 1, 27, 49, 0, time.UTC)},
StartedAt: amclient.TaskDateTime{Time: time.Date(2024, time.January, 18, 1, 27, 49, 0, time.UTC)},
CompletedAt: amclient.TaskDateTime{Time: time.Date(2024, time.January, 18, 1, 27, 49, 0, time.UTC)},
StartedAt: amclient.TaskDateTime{Time: startedAt},
CompletedAt: amclient.TaskDateTime{Time: completedAt},
Duration: amclient.TaskDuration(time.Second / 2),
},
},
},
{
ID: "c2128d39-2ace-47c5-8cac-39ded8d9c9ef",
LinkID: "208d441b-6938-44f9-b54a-bd73f05bc764",
Microservice: "Verify SIP compliance",
Name: "Verify SIP compliance",
Status: amclient.JobStatusComplete,
Tasks: []amclient.Task{
{
ID: "6f5beca3-71ad-446c-8f19-3bc4dea16c9b",
ExitCode: 0,
CreatedAt: amclient.TaskDateTime{Time: time.Date(2024, time.January, 1, 1, 27, 49, 0, time.UTC)},
StartedAt: amclient.TaskDateTime{Time: time.Date(2024, time.January, 18, 1, 27, 49, 0, time.UTC)},
CompletedAt: amclient.TaskDateTime{Time: time.Date(2024, time.January, 18, 1, 27, 49, 0, time.UTC)},
Duration: amclient.TaskDuration(time.Minute),
},
},
ID: "c2128d39-2ace-47c5-8cac-39ded8d9c9ef",
LinkID: "208d441b-6938-44f9-b54a-bd73f05bc764",
Name: "Verify bag, and restructure for compliance",
Status: amclient.JobStatusComplete,
},
}

Expand Down Expand Up @@ -109,13 +99,20 @@ func TestJobTracker(t *testing.T) {
)
},
pkgRec: func(m *fake_package.MockServiceMockRecorder) {
for _, job := range jobs {
pt := am.ConvertJobToPreservationTask(job)
pt.PreservationActionID = paID
m.CreatePreservationTask(mockutil.Context(), &pt).Return(nil)
}
m.CreatePreservationTask(
mockutil.Context(),
&datatypes.PreservationTask{
ID: 0,
TaskID: "f60018ac-da79-4769-9509-c6c41d5efe7e",
Name: "Extract zipped bag transfer",
Status: enums.PreservationTaskStatusDone,
StartedAt: sql.NullTime{Time: startedAt, Valid: true},
CompletedAt: sql.NullTime{Time: completedAt, Valid: true},
PreservationActionID: paID,
},
).Return(nil)
},
want: 2,
want: 1,
},
{
name: "Retryable error when AM returns 400 Bad Request",
Expand Down Expand Up @@ -188,11 +185,10 @@ func TestConvertJobToPreservationTask(t *testing.T) {
{
name: "Returns preservation task with computed time range",
job: amclient.Job{
ID: "f60018ac-da79-4769-9509-c6c41d5efe7e",
LinkID: "70669a5b-01e4-4ea0-ac70-10292f87da05",
Microservice: "Verify SIP compliance",
Name: "Move to processing directory",
Status: amclient.JobStatusComplete,
ID: "f60018ac-da79-4769-9509-c6c41d5efe7e",
LinkID: "70669a5b-01e4-4ea0-ac70-10292f87da05",
Name: "Move to processing directory",
Status: amclient.JobStatusComplete,
Tasks: []amclient.Task{
{
ID: "c134198c-9485-4f68-8d94-4da1e03b5e1b",
Expand Down Expand Up @@ -229,11 +225,10 @@ func TestConvertJobToPreservationTask(t *testing.T) {
{
name: "Returns NULL completedAt if job is still processing",
job: amclient.Job{
ID: "c2128d39-2ace-47c5-8cac-39ded8d9c9ef",
LinkID: "208d441b-6938-44f9-b54a-bd73f05bc764",
Microservice: "Verify SIP compliance",
Name: "Verify SIP compliance",
Status: amclient.JobStatusProcessing,
ID: "c2128d39-2ace-47c5-8cac-39ded8d9c9ef",
LinkID: "208d441b-6938-44f9-b54a-bd73f05bc764",
Name: "Verify SIP compliance",
Status: amclient.JobStatusProcessing,
Tasks: []amclient.Task{
{
ID: "6f5beca3-71ad-446c-8f19-3bc4dea16c9b",
Expand Down
18 changes: 8 additions & 10 deletions internal/am/poll_ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,10 @@ func TestPollIngestActivity(t *testing.T) {

jobs := []amclient.Job{
{
ID: "7b7f7abd-e9c9-4c2e-9837-a65fa68cfcc8",
Name: "Move to processing directory",
Status: amclient.JobStatusComplete,
Microservice: "Verify SIP compliance",
LinkID: "70669a5b-01e4-4ea0-ac70-10292f87da05",
ID: "7b7f7abd-e9c9-4c2e-9837-a65fa68cfcc8",
Name: "Identify manually normalized files",
Status: amclient.JobStatusComplete,
LinkID: "15a2df8a-7b45-4c11-b6fa-884c9b7e5c67",
Tasks: []amclient.Task{
{
ID: "9dc0b71a-cbb1-40f4-9fa4-647cc16c8ed5",
Expand All @@ -59,11 +58,10 @@ func TestPollIngestActivity(t *testing.T) {
},
},
{
ID: "43222c5f-89c3-469a-9167-5330f9e33e46",
Name: "Verify SIP compliance",
Status: amclient.JobStatusComplete,
Microservice: "Verify SIP compliance",
LinkID: "208d441b-6938-44f9-b54a-bd73f05bc764",
ID: "43222c5f-89c3-469a-9167-5330f9e33e46",
Name: "Validate preservation derivatives",
Status: amclient.JobStatusComplete,
LinkID: "5b0042a2-2244-475c-85d5-41e4b11e65d6",
Tasks: []amclient.Task{
{
ID: "6f5beca3-71ad-446c-8f19-3bc4dea16c9b",
Expand Down
18 changes: 8 additions & 10 deletions internal/am/poll_transfer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,10 @@ func TestPollTransferActivity(t *testing.T) {

jobs := []amclient.Job{
{
ID: "e6e01ebb-a8f4-459d-b9a9-c6a8103e4750",
Name: "Extract zipped transfer",
Status: amclient.JobStatusComplete,
Microservice: "Approve transfer",
LinkID: "541f5994-73b0-45bb-9cb5-367c06a21be7",
ID: "e6e01ebb-a8f4-459d-b9a9-c6a8103e4750",
Name: "Extract zipped bag transfer",
Status: amclient.JobStatusComplete,
LinkID: "3229e01f-adf3-4294-85f7-4acb01b3fbcf",
Tasks: []amclient.Task{
{
ID: "11566538-66c5-4a20-aa70-77f7a9fa83d5",
Expand All @@ -52,11 +51,10 @@ func TestPollTransferActivity(t *testing.T) {
},
},
{
ID: "2bcdb038-8861-4ea7-a7bb-01d58efac38c",
Name: "Set transfer type: Standard",
Status: amclient.JobStatusComplete,
Microservice: "Verify transfer compliance",
LinkID: "045c43ae-d6cf-44f7-97d6-c8a602748565",
ID: "2bcdb038-8861-4ea7-a7bb-01d58efac38c",
Name: "Rename with transfer UUID",
Status: amclient.JobStatusComplete,
LinkID: "154dd501-a344-45a9-97e3-b30093da35f5",
Tasks: []amclient.Task{
{
ID: "53666170-0397-4962-8736-23295444b036",
Expand Down

0 comments on commit 92feb35

Please sign in to comment.