Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fsm: adjust timeTableLimit according to longest GC threshold #24112

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
39 changes: 12 additions & 27 deletions nomad/blocked_evals.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,6 @@ type BlockedEvals struct {
// duplicates.
duplicateCh chan struct{}

// timetable is used to correlate indexes with their insertion time. This
// allows us to prune based on time.
timetable *TimeTable

// stopCh is used to stop any created goroutines.
stopCh chan struct{}
}
Expand Down Expand Up @@ -143,12 +139,6 @@ func (b *BlockedEvals) SetEnabled(enabled bool) {
}
}

func (b *BlockedEvals) SetTimetable(timetable *TimeTable) {
b.l.Lock()
b.timetable = timetable
b.l.Unlock()
}

// Block tracks the passed evaluation and enqueues it into the eval broker when
// a suitable node calls unblock.
func (b *BlockedEvals) Block(eval *structs.Evaluation) {
Expand Down Expand Up @@ -700,7 +690,6 @@ func (b *BlockedEvals) Flush() {
b.escaped = make(map[string]wrappedEval)
b.jobs = make(map[structs.NamespacedID]string)
b.unblockIndexes = make(map[string]uint64)
b.timetable = nil
b.duplicates = nil
b.capacityChangeCh = make(chan *capacityUpdate, unblockBuffer)
b.stopCh = make(chan struct{})
Expand Down Expand Up @@ -774,29 +763,25 @@ func (b *BlockedEvals) prune(stopCh <-chan struct{}) {
return
case t := <-ticker.C:
cutoff := t.UTC().Add(-1 * pruneThreshold)
b.pruneUnblockIndexes(cutoff)
// b.pruneUnblockIndexes(cutoff)
b.pruneStats(cutoff)
}
}
}

// pruneUnblockIndexes is used to prune any tracked entry that is excessively
// old. This protects againsts unbounded growth of the map.
func (b *BlockedEvals) pruneUnblockIndexes(cutoff time.Time) {
b.l.Lock()
defer b.l.Unlock()

if b.timetable == nil {
return
}

oldThreshold := b.timetable.NearestIndex(cutoff)
for key, index := range b.unblockIndexes {
if index < oldThreshold {
delete(b.unblockIndexes, key)
}
}
}
// func (b *BlockedEvals) pruneUnblockIndexes(cutoff time.Time) {
// b.l.Lock()
// defer b.l.Unlock()

// oldThreshold := b.timetable.NearestIndex(cutoff)
// for key, index := range b.unblockIndexes {
// if index < oldThreshold {
// delete(b.unblockIndexes, key)
// }
// }
// }

// pruneStats is used to prune any zero value stats that are excessively old.
func (b *BlockedEvals) pruneStats(cutoff time.Time) {
Expand Down
102 changes: 38 additions & 64 deletions nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"
"encoding/json"
"fmt"
"math"
"strings"
"time"

Expand Down Expand Up @@ -116,8 +115,7 @@ func (c *CoreScheduler) jobGC(eval *structs.Evaluation) error {
return err
}

oldThreshold := c.getThreshold(eval, "job",
"job_gc_threshold", c.srv.config.JobGCThreshold)
cutoffTime := c.getCutoffTime(c.srv.config.JobGCThreshold)

// Collect the allocations, evaluations and jobs to GC
var gcAlloc, gcEval []string
Expand All @@ -128,7 +126,8 @@ OUTER:
job := i.(*structs.Job)

// Ignore new jobs.
if job.CreateIndex > oldThreshold {
st := time.Unix(0, job.SubmitTime)
if st.After(cutoffTime) {
continue
}

Expand All @@ -142,7 +141,7 @@ OUTER:
allEvalsGC := true
var jobAlloc, jobEval []string
for _, eval := range evals {
gc, allocs, err := c.gcEval(eval, oldThreshold, true)
gc, allocs, err := c.gcEval(eval, cutoffTime, true)
if err != nil {
continue OUTER
} else if gc {
Expand Down Expand Up @@ -252,22 +251,20 @@ func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error {
return err
}

oldThreshold := c.getThreshold(eval, "eval",
"eval_gc_threshold", c.srv.config.EvalGCThreshold)
batchOldThreshold := c.getThreshold(eval, "eval",
"batch_eval_gc_threshold", c.srv.config.BatchEvalGCThreshold)
cutoffTime := c.getCutoffTime(c.srv.config.EvalGCThreshold)
batchCutoffTime := c.getCutoffTime(c.srv.config.BatchEvalGCThreshold)

// Collect the allocations and evaluations to GC
var gcAlloc, gcEval []string
for raw := iter.Next(); raw != nil; raw = iter.Next() {
eval := raw.(*structs.Evaluation)

gcThreshold := oldThreshold
gcCutoffTime := cutoffTime
if eval.Type == structs.JobTypeBatch {
gcThreshold = batchOldThreshold
gcCutoffTime = batchCutoffTime
}

gc, allocs, err := c.gcEval(eval, gcThreshold, false)
gc, allocs, err := c.gcEval(eval, gcCutoffTime, false)
if err != nil {
return err
}
Expand All @@ -293,10 +290,12 @@ func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error {
// allocs are not older than the threshold. If the eval should be garbage
// collected, the associated alloc ids that should also be removed are also
// returned
func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64, allowBatch bool) (
func (c *CoreScheduler) gcEval(eval *structs.Evaluation, cutoffTime time.Time, allowBatch bool) (
bool, []string, error) {

// Ignore non-terminal and new evaluations
if !eval.TerminalStatus() || eval.ModifyIndex > thresholdIndex {
mt := time.Unix(0, eval.ModifyTime)
if !eval.TerminalStatus() || mt.After(cutoffTime) {
return false, nil, nil
}

Expand Down Expand Up @@ -335,7 +334,7 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64,
// If we cannot collect outright, check if a partial GC may occur
collect := job == nil || job.Status == structs.JobStatusDead && (job.Stop || allowBatch)
if !collect {
oldAllocs := olderVersionTerminalAllocs(allocs, job, thresholdIndex)
oldAllocs := olderVersionTerminalAllocs(allocs, job, cutoffTime)
gcEval := (len(oldAllocs) == len(allocs))
return gcEval, oldAllocs, nil
}
Expand All @@ -345,7 +344,7 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64,
gcEval := true
var gcAllocIDs []string
for _, alloc := range allocs {
if !allocGCEligible(alloc, job, time.Now(), thresholdIndex) {
if !allocGCEligible(alloc, job, time.Now(), cutoffTime) {
// Can't GC the evaluation since not all of the allocations are
// terminal
gcEval = false
Expand All @@ -360,10 +359,11 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64,

// olderVersionTerminalAllocs returns a list of terminal allocations that belong to the evaluation and may be
// GCed.
func olderVersionTerminalAllocs(allocs []*structs.Allocation, job *structs.Job, thresholdIndex uint64) []string {
func olderVersionTerminalAllocs(allocs []*structs.Allocation, job *structs.Job, cutoffTime time.Time) []string {
var ret []string
for _, alloc := range allocs {
if alloc.CreateIndex < job.JobModifyIndex && alloc.ModifyIndex < thresholdIndex && alloc.TerminalStatus() {
mi := time.Unix(0, alloc.ModifyTime)
if alloc.CreateIndex < job.JobModifyIndex && mi.Before(cutoffTime) && alloc.TerminalStatus() {
ret = append(ret, alloc.ID)
}
}
Expand Down Expand Up @@ -439,8 +439,7 @@ func (c *CoreScheduler) nodeGC(eval *structs.Evaluation) error {
return err
}

oldThreshold := c.getThreshold(eval, "node",
"node_gc_threshold", c.srv.config.NodeGCThreshold)
cutoffTime := c.getCutoffTime(c.srv.config.NodeGCThreshold)

// Collect the nodes to GC
var gcNode []string
Expand All @@ -453,7 +452,8 @@ OUTER:
node := raw.(*structs.Node)

// Ignore non-terminal and new nodes
if !node.TerminalStatus() || node.ModifyIndex > oldThreshold {
st := time.Unix(node.StatusUpdatedAt, 0)
if !node.TerminalStatus() || st.After(cutoffTime) {
continue
}

Expand Down Expand Up @@ -536,8 +536,7 @@ func (c *CoreScheduler) deploymentGC(eval *structs.Evaluation) error {
return err
}

oldThreshold := c.getThreshold(eval, "deployment",
"deployment_gc_threshold", c.srv.config.DeploymentGCThreshold)
cutoffTime := c.getCutoffTime(c.srv.config.DeploymentGCThreshold)

// Collect the deployments to GC
var gcDeployment []string
Expand All @@ -551,7 +550,8 @@ OUTER:
deploy := raw.(*structs.Deployment)

// Ignore non-terminal and new deployments
if deploy.Active() || deploy.ModifyIndex > oldThreshold {
mt := time.Unix(0, deploy.ModifyTime)
if deploy.Active() || mt.After(cutoffTime) {
continue
}

Expand Down Expand Up @@ -628,9 +628,10 @@ func (c *CoreScheduler) partitionDeploymentReap(deployments []string, batchSize

// allocGCEligible returns if the allocation is eligible to be garbage collected
// according to its terminal status and its reschedule trackers
func allocGCEligible(a *structs.Allocation, job *structs.Job, gcTime time.Time, thresholdIndex uint64) bool {
func allocGCEligible(a *structs.Allocation, job *structs.Job, gcTime, cutoffTime time.Time) bool {
// Not in a terminal status and old enough
if !a.TerminalStatus() || a.ModifyIndex > thresholdIndex {
mt := time.Unix(0, a.ModifyTime)
if !a.TerminalStatus() || mt.After(cutoffTime) {
return false
}

Expand Down Expand Up @@ -728,14 +729,14 @@ func (c *CoreScheduler) csiVolumeClaimGC(eval *structs.Evaluation) error {
return err
}

oldThreshold := c.getThreshold(eval, "CSI volume claim",
"csi_volume_claim_gc_threshold", c.srv.config.CSIVolumeClaimGCThreshold)
cutoffTime := c.getCutoffTime(c.srv.config.CSIVolumeClaimGCThreshold)

for i := iter.Next(); i != nil; i = iter.Next() {
vol := i.(*structs.CSIVolume)

// Ignore new volumes
if vol.CreateIndex > oldThreshold {
mt := time.Unix(0, vol.ModifyTime)
if mt.After(cutoffTime) {
continue
}

Expand Down Expand Up @@ -768,14 +769,14 @@ func (c *CoreScheduler) csiPluginGC(eval *structs.Evaluation) error {
return err
}

oldThreshold := c.getThreshold(eval, "CSI plugin",
"csi_plugin_gc_threshold", c.srv.config.CSIPluginGCThreshold)
cutoffTime := c.getCutoffTime(c.srv.config.CSIPluginGCThreshold)

for i := iter.Next(); i != nil; i = iter.Next() {
plugin := i.(*structs.CSIPlugin)

// Ignore new plugins
if plugin.CreateIndex > oldThreshold {
mt := time.Unix(0, plugin.ModifyTime)
if mt.After(cutoffTime) {
continue
}

Expand Down Expand Up @@ -829,15 +830,7 @@ func (c *CoreScheduler) expiredACLTokenGC(eval *structs.Evaluation, global bool)
return nil
}

// The object name is logged within the getThreshold function, therefore we
// want to be clear what token type this trigger is for.
tokenScope := "local"
if global {
tokenScope = "global"
}

expiryThresholdIdx := c.getThreshold(eval, tokenScope+" expired ACL tokens",
"acl_token_expiration_gc_threshold", c.srv.config.ACLTokenExpirationGCThreshold)
cutoffTime := c.getCutoffTime(c.srv.config.ACLTokenExpirationGCThreshold)

expiredIter, err := c.snap.ACLTokensByExpired(global)
if err != nil {
Expand Down Expand Up @@ -868,7 +861,7 @@ func (c *CoreScheduler) expiredACLTokenGC(eval *structs.Evaluation, global bool)

// Check if the token is recent enough to skip, otherwise we'll delete
// it.
if token.CreateIndex > expiryThresholdIdx {
if token.CreateTime.After(cutoffTime) {
continue
}

Expand Down Expand Up @@ -1289,26 +1282,7 @@ func (c *CoreScheduler) rotateVariables(iter memdb.ResultIterator, eval *structs
return nil
}

// getThreshold returns the index threshold for determining whether an
// object is old enough to GC
func (c *CoreScheduler) getThreshold(eval *structs.Evaluation, objectName, configName string, configThreshold time.Duration) uint64 {
var oldThreshold uint64
if eval.JobID == structs.CoreJobForceGC {
// The GC was forced, so set the threshold to its maximum so
// everything will GC.
oldThreshold = math.MaxUint64
c.logger.Debug(fmt.Sprintf("forced %s GC", objectName))
} else {
// Compute the old threshold limit for GC using the FSM
// time table. This is a rough mapping of a time to the
// Raft index it belongs to.
tt := c.srv.fsm.TimeTable()
cutoff := time.Now().UTC().Add(-1 * configThreshold)
oldThreshold = tt.NearestIndex(cutoff)
c.logger.Debug(
fmt.Sprintf("%s GC scanning before cutoff index", objectName),
"index", oldThreshold,
configName, configThreshold)
}
return oldThreshold
// getCutoffTime returns a time.Time of the latest object that should be GCd
func (c *CoreScheduler) getCutoffTime(configThreshold time.Duration) time.Time {
return time.Now().UTC().Add(-1 * configThreshold)
}
Loading
Loading