Skip to content

Commit

Permalink
Check for duplicated SIPs
Browse files Browse the repository at this point in the history
In the preprocessing workflow, expect a compressed SIP instead
of a directory. If checkDuplicates is enabled in the configuration,
calculate the SIP checksum and persist the SIP name and checksum
in the DB. If there is already an entry with the same checksum, fail
the workflow with a validation error.

- Add activity to calculate the SIP checksum.
- Add local activity to check for duplicates.
- Include archiveextract activity from temporal-activities.
- Add events for the three activities to the result.
  • Loading branch information
jraddaoui committed Jan 16, 2025
1 parent 8f46095 commit d10bae7
Show file tree
Hide file tree
Showing 11 changed files with 729 additions and 33 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ gen-mock: # @HELP Generate mocks.
gen-mock: $(MOCKGEN)
mockgen -typed -destination=./internal/fformat/fake/mock_identifier.go -package=fake github.com/artefactual-sdps/preprocessing-sfa/internal/fformat Identifier
mockgen -typed -destination=./internal/fvalidate/fake/mock_validator.go -package=fake github.com/artefactual-sdps/preprocessing-sfa/internal/fvalidate Validator
mockgen -typed -destination=./internal/persistence/fake/mock_service.go -package=fake github.com/artefactual-sdps/preprocessing-sfa/internal/persistence Service

golines: # @HELP Run the golines formatter to fix long lines.
golines: GOLINES_OUT_MODE ?= write-output
Expand Down
66 changes: 39 additions & 27 deletions cmd/worker/workercmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"ariga.io/sqlcomment"
"entgo.io/ent/dialect/sql"
bagit_gython "github.com/artefactual-labs/bagit-gython"
"github.com/artefactual-sdps/temporal-activities/archiveextract"
"github.com/artefactual-sdps/temporal-activities/bagcreate"
"github.com/artefactual-sdps/temporal-activities/bagvalidate"
"github.com/artefactual-sdps/temporal-activities/ffvalidate"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/artefactual-sdps/preprocessing-sfa/internal/fformat"
"github.com/artefactual-sdps/preprocessing-sfa/internal/fvalidate"
"github.com/artefactual-sdps/preprocessing-sfa/internal/persistence"
entclient "github.com/artefactual-sdps/preprocessing-sfa/internal/persistence/ent/client"
"github.com/artefactual-sdps/preprocessing-sfa/internal/persistence/ent/db"
"github.com/artefactual-sdps/preprocessing-sfa/internal/workflow"
)
Expand Down Expand Up @@ -75,11 +77,47 @@ func (m *Main) Run(ctx context.Context) error {
return err
}

var psvc persistence.Service
if m.cfg.CheckDuplicates {
sqlDB, err := persistence.Open(m.cfg.Persistence.Driver, m.cfg.Persistence.DSN)
if err != nil {
m.logger.Error(err, "Error initializing database pool.")
return err
}
m.dbClient = db.NewClient(
db.Driver(
sqlcomment.NewDriver(
sql.OpenDB(m.cfg.Persistence.Driver, sqlDB),
sqlcomment.WithDriverVerTag(),
sqlcomment.WithTags(sqlcomment.Tags{
sqlcomment.KeyApplication: Name,
}),
),
),
)
if m.cfg.Persistence.Migrate {
err = m.dbClient.Schema.Create(ctx)
if err != nil {
m.logger.Error(err, "Error migrating database.")
return err
}
}
psvc = entclient.New(m.dbClient)
}

w.RegisterWorkflowWithOptions(
workflow.NewPreprocessingWorkflow(m.cfg.SharedPath).Execute,
workflow.NewPreprocessingWorkflow(m.cfg.SharedPath, m.cfg.CheckDuplicates, psvc).Execute,
temporalsdk_workflow.RegisterOptions{Name: m.cfg.Temporal.WorkflowName},
)

w.RegisterActivityWithOptions(
activities.NewChecksumSIP().Execute,
temporalsdk_activity.RegisterOptions{Name: activities.ChecksumSIPName},
)
w.RegisterActivityWithOptions(
archiveextract.New(archiveextract.Config{}).Execute,
temporalsdk_activity.RegisterOptions{Name: archiveextract.Name},
)
w.RegisterActivityWithOptions(
bagvalidate.New(m.bagValidator).Execute,
temporalsdk_activity.RegisterOptions{Name: bagvalidate.Name},
Expand Down Expand Up @@ -144,32 +182,6 @@ func (m *Main) Run(ctx context.Context) error {
temporalsdk_activity.RegisterOptions{Name: bagcreate.Name},
)

if m.cfg.CheckDuplicates {
sqlDB, err := persistence.Open(m.cfg.Persistence.Driver, m.cfg.Persistence.DSN)
if err != nil {
m.logger.Error(err, "Error initializing database pool.")
return err
}
m.dbClient = db.NewClient(
db.Driver(
sqlcomment.NewDriver(
sql.OpenDB(m.cfg.Persistence.Driver, sqlDB),
sqlcomment.WithDriverVerTag(),
sqlcomment.WithTags(sqlcomment.Tags{
sqlcomment.KeyApplication: Name,
}),
),
),
)
if m.cfg.Persistence.Migrate {
err = m.dbClient.Schema.Create(ctx)
if err != nil {
m.logger.Error(err, "Error migrating database.")
return err
}
}
}

if err := w.Start(); err != nil {
m.logger.Error(err, "Preprocessing worker failed to start.")
return err
Expand Down
18 changes: 18 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ require (
ariga.io/atlas v0.19.2 // indirect
filippo.io/edwards25519 v1.1.0 // indirect
github.com/agext/levenshtein v1.2.3 // indirect
github.com/andybalholm/brotli v1.0.4 // indirect
github.com/antchfx/xpath v1.3.2 // indirect
github.com/apparentlymart/go-textseg/v15 v15.0.0 // indirect
github.com/aws/aws-sdk-go v1.55.5 // indirect
Expand All @@ -54,7 +55,12 @@ require (
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.4 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.30.3 // indirect
github.com/aws/smithy-go v1.20.3 // indirect
github.com/bodgit/plumbing v1.2.0 // indirect
github.com/bodgit/sevenzip v1.3.0 // indirect
github.com/bodgit/windows v1.0.0 // indirect
github.com/connesc/cipherio v0.2.1 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dsnet/compress v0.0.1 // indirect
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
Expand All @@ -63,22 +69,31 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/safeopen v0.0.0-20240125081138-66b54d5181c6 // indirect
github.com/google/wire v0.6.0 // indirect
github.com/googleapis/gax-go/v2 v2.13.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/hashicorp/hcl/v2 v2.19.1 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/pgzip v1.2.5 // indirect
github.com/kluctl/go-embed-python v0.0.0-3.12.3-20240415-1 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mholt/archiver/v4 v4.0.0-alpha.8 // indirect
github.com/mitchellh/go-wordwrap v1.0.1 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/nwaples/rardecode/v2 v2.0.0-beta.2 // indirect
github.com/nyudlts/go-bagit v0.3.0-alpha.0.20240515212815-8dab411c23af // indirect
github.com/otiai10/copy v1.14.0 // indirect
github.com/pborman/uuid v1.2.1 // indirect
github.com/pelletier/go-toml/v2 v2.1.1 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/richardlehane/characterize v1.0.0 // indirect
github.com/richardlehane/match v1.0.5 // indirect
Expand All @@ -98,6 +113,8 @@ require (
github.com/spf13/cast v1.6.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
github.com/therootcompany/xz v1.0.1 // indirect
github.com/ulikunitz/xz v0.5.10 // indirect
github.com/zclconf/go-cty v1.14.1 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 // indirect
Expand All @@ -108,6 +125,7 @@ require (
go.temporal.io/api v1.32.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
go4.org v0.0.0-20200411211856-f5505b9728dd // indirect
golang.org/x/exp v0.0.0-20231219180239-dc181d75b848 // indirect
golang.org/x/image v0.17.0 // indirect
golang.org/x/mod v0.20.0 // indirect
Expand Down
Loading

0 comments on commit d10bae7

Please sign in to comment.