Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow pending job proposals to be revoked #8657

Merged
merged 2 commits into from
Mar 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions core/services/feeds/mocks/orm.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 26 additions & 2 deletions core/services/feeds/mocks/service.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 46 additions & 7 deletions core/services/feeds/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,30 +25,31 @@ type ORM interface {
ListManagersByIDs(ids []int64) ([]FeedsManager, error)
UpdateManager(mgr FeedsManager, qopts ...pg.QOpt) error

CreateChainConfig(cfg ChainConfig, qopts ...pg.QOpt) (int64, error)
CreateBatchChainConfig(cfgs []ChainConfig, qopts ...pg.QOpt) ([]int64, error)
CreateChainConfig(cfg ChainConfig, qopts ...pg.QOpt) (int64, error)
DeleteChainConfig(id int64) (int64, error)
GetChainConfig(id int64) (*ChainConfig, error)
UpdateChainConfig(cfg ChainConfig) (int64, error)
ListChainConfigsByManagerIDs(mgrIDs []int64) ([]ChainConfig, error)
UpdateChainConfig(cfg ChainConfig) (int64, error)

CreateJobProposal(jp *JobProposal) (int64, error)
CountJobProposals() (int64, error)
CountJobProposalsByStatus() (counts *JobProposalCounts, err error)
CreateJobProposal(jp *JobProposal) (int64, error)
DeleteProposal(id int64, qopts ...pg.QOpt) error
GetJobProposal(id int64, qopts ...pg.QOpt) (*JobProposal, error)
GetJobProposalByRemoteUUID(uuid uuid.UUID) (*JobProposal, error)
ListJobProposals() (jps []JobProposal, err error)
ListJobProposalsByManagersIDs(ids []int64, qopts ...pg.QOpt) ([]JobProposal, error)
RevokeProposal(id int64, qopts ...pg.QOpt) error
UpdateJobProposalStatus(id int64, status JobProposalStatus, qopts ...pg.QOpt) error // NEEDED?
UpsertJobProposal(jp *JobProposal, qopts ...pg.QOpt) (int64, error)

ApproveSpec(id int64, externalJobID uuid.UUID, qopts ...pg.QOpt) error
CancelSpec(id int64, qopts ...pg.QOpt) error
CreateSpec(spec JobProposalSpec, qopts ...pg.QOpt) (int64, error)
ExistsSpecByJobProposalIDAndVersion(jpID int64, version int32, qopts ...pg.QOpt) (exists bool, err error)
GetLatestSpec(jpID int64) (*JobProposalSpec, error)
GetApprovedSpec(jpID int64, qopts ...pg.QOpt) (*JobProposalSpec, error)
GetLatestSpec(jpID int64) (*JobProposalSpec, error)
GetSpec(id int64, qopts ...pg.QOpt) (*JobProposalSpec, error)
ListSpecsByJobProposalIDs(ids []int64, qopts ...pg.QOpt) ([]JobProposalSpec, error)
RejectSpec(id int64, qopts ...pg.QOpt) error
Expand Down Expand Up @@ -345,16 +346,19 @@ WHERE id = $1
return jp, errors.Wrap(err, "GetJobProposal failed")
}

// GetJobProposalByRemoteUUID gets a job proposal by the remote FMS uuid.
// GetJobProposalByRemoteUUID gets a job proposal by the remote FMS uuid. This
// method will filter out the deleted job proposals. To get all job proposals,
// use the GetJobProposal get by id method.
func (o *orm) GetJobProposalByRemoteUUID(id uuid.UUID) (jp *JobProposal, err error) {
stmt := `
SELECT *
FROM job_proposals
WHERE remote_uuid = $1;
WHERE remote_uuid = $1
AND status <> $2;
`

jp = new(JobProposal)
err = o.q.Get(jp, stmt, id)
err = o.q.Get(jp, stmt, id, JobProposalStatusDeleted)
return jp, errors.Wrap(err, "GetJobProposalByRemoteUUID failed")
}

Expand Down Expand Up @@ -494,6 +498,7 @@ UPDATE job_proposals
SET status = (
CASE
WHEN status = 'deleted' THEN 'deleted'::job_proposal_status
WHEN status = 'revoked' THEN 'revoked'::job_proposal_status
ELSE 'cancelled'::job_proposal_status
END
),
Expand Down Expand Up @@ -688,6 +693,40 @@ WHERE id = $1
return nil
}

// RevokeProposal revokes a job proposal with a pending job spec. An approved
// proposal cannot be revoked. A revoked proposal's job spec cannot be approved
// or edited, but the job can be reproposed by FMS.
func (o *orm) RevokeProposal(id int64, qopts ...pg.QOpt) error {
stmt := `
UPDATE job_proposals
SET status = (
CASE
WHEN status = 'approved' THEN 'approved'::job_proposal_status
WHEN status = 'deleted' THEN 'deleted'::job_proposal_status
ELSE 'revoked'::job_proposal_status
END
),
pending_update = FALSE,
updated_at = NOW()
WHERE id = $1
`

result, err := o.q.WithOpts(qopts...).Exec(stmt, id)
if err != nil {
return err
}
rowsAffected, err := result.RowsAffected()
if err != nil {
return err
}

if rowsAffected == 0 {
return sql.ErrNoRows
}

return nil
}

// UpdateSpecDefinition updates the definition of a job proposal spec by id.
func (o *orm) UpdateSpecDefinition(id int64, spec string, qopts ...pg.QOpt) error {
stmt := `
Expand Down
121 changes: 121 additions & 0 deletions core/services/feeds/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,7 @@ func Test_ORM_GetJobProposal(t *testing.T) {
orm := setupORM(t)
fmID := createFeedsManager(t, orm)
remoteUUID := uuid.NewV4()
deletedUUID := uuid.NewV4()
name := null.StringFrom("jp1")

jp := &feeds.JobProposal{
Expand All @@ -485,9 +486,19 @@ func Test_ORM_GetJobProposal(t *testing.T) {
FeedsManagerID: fmID,
}

deletedJp := &feeds.JobProposal{
Name: name,
RemoteUUID: deletedUUID,
Status: feeds.JobProposalStatusDeleted,
FeedsManagerID: fmID,
}

id, err := orm.CreateJobProposal(jp)
require.NoError(t, err)

_, err = orm.CreateJobProposal(deletedJp)
require.NoError(t, err)

assertJobEquals := func(actual *feeds.JobProposal) {
assert.Equal(t, id, actual.ID)
assert.Equal(t, name, actual.Name)
Expand Down Expand Up @@ -515,6 +526,9 @@ func Test_ORM_GetJobProposal(t *testing.T) {

assertJobEquals(actual)

_, err = orm.GetJobProposalByRemoteUUID(deletedUUID)
require.Error(t, err)

_, err = orm.GetJobProposalByRemoteUUID(uuid.NewV4())
require.Error(t, err)
})
Expand Down Expand Up @@ -941,6 +955,113 @@ func Test_ORM_DeleteProposal(t *testing.T) {
}
}

func Test_ORM_RevokeJob(t *testing.T) {
t.Parallel()

testCases := []struct {
name string
before func(orm *TestORM) int64
wantProposalStatus feeds.JobProposalStatus
wantErr string
}{
{
name: "pending proposal",
before: func(orm *TestORM) int64 {
fmID := createFeedsManager(t, orm)
jpID := createJobProposal(t, orm, feeds.JobProposalStatusPending, fmID)

return jpID
},
wantProposalStatus: feeds.JobProposalStatusRevoked,
},
{
name: "approved proposal",
before: func(orm *TestORM) int64 {
fmID := createFeedsManager(t, orm)
jpID := createJobProposal(t, orm, feeds.JobProposalStatusPending, fmID)
specID := createJobSpec(t, orm, int64(jpID))

externalJobID := uuid.NullUUID{UUID: uuid.NewV4(), Valid: true}

// Defer the FK requirement of an existing job for a job proposal.
require.NoError(t, utils.JustError(orm.db.Exec(
`SET CONSTRAINTS job_proposals_job_id_fkey DEFERRED`,
)))

err := orm.ApproveSpec(specID, externalJobID.UUID)
require.NoError(t, err)

return jpID
},
wantProposalStatus: feeds.JobProposalStatusApproved,
},
{
name: "cancelled proposal",
before: func(orm *TestORM) int64 {
fmID := createFeedsManager(t, orm)
jpID := createJobProposal(t, orm, feeds.JobProposalStatusCancelled, fmID)

return jpID
},
wantProposalStatus: feeds.JobProposalStatusRevoked,
},
{
name: "rejected proposal",
before: func(orm *TestORM) int64 {
fmID := createFeedsManager(t, orm)
jpID := createJobProposal(t, orm, feeds.JobProposalStatusRejected, fmID)

return jpID
},
wantProposalStatus: feeds.JobProposalStatusRevoked,
},
{
name: "deleted proposal",
before: func(orm *TestORM) int64 {
fmID := createFeedsManager(t, orm)
jpID := createJobProposal(t, orm, feeds.JobProposalStatusDeleted, fmID)

return jpID
},
wantProposalStatus: feeds.JobProposalStatusDeleted,
},
{
name: "not found",
before: func(orm *TestORM) int64 {
return 0
},
wantErr: "sql: no rows in result set",
},
}

for _, tc := range testCases {
tc := tc

t.Run(tc.name, func(t *testing.T) {
orm := setupORM(t)

jpID := tc.before(orm)

err := orm.RevokeProposal(jpID)

if tc.wantErr != "" {
require.EqualError(t, err, tc.wantErr)
} else {
require.NoError(t, err)

actualJP, err := orm.GetJobProposal(jpID)
require.NoError(t, err)

assert.Equal(t, tc.wantProposalStatus, actualJP.Status)
assert.False(t, actualJP.PendingUpdate)

assert.Equal(t, jpID, actualJP.ID)
assert.Equal(t, tc.wantProposalStatus, actualJP.Status)
}
})
}
}

func Test_ORM_ExistsSpecByJobProposalIDAndVersion(t *testing.T) {
t.Parallel()

Expand Down
Loading