Skip to content

Commit

Permalink
backport of commit 5d12ca4 (#19725)
Browse files Browse the repository at this point in the history
Co-authored-by: Piotr Kazmierczak <470696+pkazmierczak@users.noreply.github.com>
  • Loading branch information
hc-github-team-nomad-core and pkazmierczak authored Jan 12, 2024
1 parent 6a1936c commit 0843561
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 27 deletions.
3 changes: 3 additions & 0 deletions .changelog/19609.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
state: Fixed a bug where purged jobs would not get new deployments
```
4 changes: 1 addition & 3 deletions nomad/core_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,6 @@ func TestCoreScheduler_EvalGC_Batch(t *testing.T) {
stoppedAlloc, lostAlloc,
activeJobRunningAlloc, activeJobLostAlloc, activeJobCompletedEvalCompletedAlloc,
stoppedJobStoppedAlloc, stoppedJobLostAlloc,
purgedJobCompleteAlloc,
},
[]*structs.Allocation{},
)
Expand Down Expand Up @@ -576,7 +575,6 @@ func TestCoreScheduler_EvalGC_Batch(t *testing.T) {
stoppedAlloc, lostAlloc,
activeJobRunningAlloc, activeJobLostAlloc, activeJobCompletedEvalCompletedAlloc,
stoppedJobStoppedAlloc, stoppedJobLostAlloc,
purgedJobCompleteAlloc,
},
[]*structs.Allocation{},
)
Expand All @@ -598,7 +596,7 @@ func TestCoreScheduler_EvalGC_Batch(t *testing.T) {
// than that of the job).
// 3. The active job remains since it is active, even though the allocations are otherwise
// eligible for GC. However, the inactive allocation is GCed for it.
// 4. The eval and allocation for the purged job are GCed.
// 4. The eval and allocation for the purged job are deleted.
assertCorrectJobEvalAlloc(
memdb.NewWatchSet(),
[]*structs.Job{deadJob, activeJob, stoppedJob},
Expand Down
121 changes: 119 additions & 2 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,18 @@ func (s *StateStore) DeleteDeployment(index uint64, deploymentIDs []string) erro
txn := s.db.WriteTxn(index)
defer txn.Abort()

err := s.DeleteDeploymentTxn(index, deploymentIDs, txn)
if err == nil {
return txn.Commit()
}

return err
}

// DeleteDeploymentTxn is used to delete a set of deployments by ID, like
// DeleteDeployment but in a transaction. Useful when making multiple
// modifications atomically.
func (s *StateStore) DeleteDeploymentTxn(index uint64, deploymentIDs []string, txn Txn) error {
if len(deploymentIDs) == 0 {
return nil
}
Expand All @@ -817,7 +829,7 @@ func (s *StateStore) DeleteDeployment(index uint64, deploymentIDs []string) erro
return fmt.Errorf("deployment lookup failed: %v", err)
}
if existing == nil {
return fmt.Errorf("deployment not found")
continue
}

// Delete the deployment
Expand All @@ -830,7 +842,50 @@ func (s *StateStore) DeleteDeployment(index uint64, deploymentIDs []string) erro
return fmt.Errorf("index update failed: %v", err)
}

return txn.Commit()
return nil
}

// DeleteAlloc is used to delete a set of allocations by ID
func (s *StateStore) DeleteAlloc(index uint64, allocIDs []string) error {
txn := s.db.WriteTxn(index)
defer txn.Abort()

err := s.DeleteAllocTxn(index, allocIDs, txn)
if err == nil {
return txn.Commit()
}

return err
}

// DeleteAllocTxn is used to delete a set of allocs by ID, like DeleteALloc but
// in a transaction. Useful when making multiple modifications atomically.
func (s *StateStore) DeleteAllocTxn(index uint64, allocIDs []string, txn Txn) error {
if len(allocIDs) == 0 {
return nil
}

for _, allocID := range allocIDs {
// Lookup the alloc
existing, err := txn.First("allocs", "id", allocID)
if err != nil {
return fmt.Errorf("alloc lookup failed: %v", err)
}
if existing == nil {
continue
}

// Delete the alloc
if err := txn.Delete("allocs", existing); err != nil {
return fmt.Errorf("alloc delete failed: %v", err)
}
}

if err := txn.Insert("index", &IndexEntry{"allocs", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}

return nil
}

// UpsertScalingEvent is used to insert a new scaling event.
Expand Down Expand Up @@ -1894,6 +1949,68 @@ func (s *StateStore) DeleteJobTxn(index uint64, namespace, jobID string, txn Txn
return err
}

// Delete job deployments
deployments, err := s.DeploymentsByJobID(nil, namespace, job.ID, true)
if err != nil {
return fmt.Errorf("deployment lookup for job %s failed: %v", job.ID, err)
}

deploymentIDs := []string{}
for _, d := range deployments {
deploymentIDs = append(deploymentIDs, d.ID)
}

if err := s.DeleteDeploymentTxn(index, deploymentIDs, txn); err != nil {
return err
}

// Mark all "pending" evals for this job as "complete"
evals, err := s.EvalsByJob(nil, namespace, job.ID)
if err != nil {
return fmt.Errorf("eval lookup for job %s failed: %v", job.ID, err)
}

for _, eval := range evals {
existing, err := txn.First("evals", "id", eval.ID)
if err != nil {
return fmt.Errorf("eval lookup failed: %v", err)
}
if existing == nil {
continue
}

if existing.(*structs.Evaluation).Status != structs.EvalStatusPending {
continue
}

eval := existing.(*structs.Evaluation).Copy()
eval.Status = structs.EvalStatusComplete
eval.StatusDescription = fmt.Sprintf("job %s deleted", job.ID)

// Insert the eval
if err := txn.Insert("evals", eval); err != nil {
return fmt.Errorf("eval insert failed: %v", err)
}
if err := txn.Insert("index", &IndexEntry{"evals", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
}

// Delete job allocs
allocs, err := s.AllocsByJob(nil, namespace, job.ID, true)
if err != nil {
return fmt.Errorf("alloc lookup for job %s failed: %v", job.ID, err)
}

allocIDs := []string{}
for _, a := range allocs {
allocIDs = append(allocIDs, a.ID)
}

if err := s.DeleteAllocTxn(index, allocIDs, txn); err != nil {
return err
}

// Cleanup plugins registered by this job, before we delete the summary
err = s.deleteJobFromPlugins(index, txn, job)
if err != nil {
Expand Down
23 changes: 1 addition & 22 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4277,27 +4277,6 @@ func TestStateStore_CSIPlugin_Lifecycle(t *testing.T) {
require.True(t, plug.ControllerRequired)
require.False(t, plug.IsEmpty())

updateAllocsFn(allocIDs, SERVER,
func(alloc *structs.Allocation) {
alloc.DesiredStatus = structs.AllocDesiredStatusStop
})

updateAllocsFn(allocIDs, CLIENT,
func(alloc *structs.Allocation) {
alloc.ClientStatus = structs.AllocClientStatusComplete
})

plug = checkPlugin(pluginCounts{
controllerFingerprints: 1,
nodeFingerprints: 2,
controllersHealthy: 1,
nodesHealthy: 2,
controllersExpected: 0,
nodesExpected: 0,
})
require.True(t, plug.ControllerRequired)
require.False(t, plug.IsEmpty())

for _, node := range nodes {
updateNodeFn(node.ID, func(node *structs.Node) {
node.CSIControllerPlugins = nil
Expand Down Expand Up @@ -7128,7 +7107,7 @@ func TestStateStore_AllocsForRegisteredJob(t *testing.T) {
t.Fatalf("err: %v", err)
}

expected := len(allocs) + len(allocs1)
expected := len(allocs1) // state.DeleteJob corresponds to stop -purge, so all allocs from the original job should be gone
if len(out) != expected {
t.Fatalf("expected: %v, actual: %v", expected, len(out))
}
Expand Down

0 comments on commit 0843561

Please sign in to comment.