diff --git a/internal/controller/monitor/monitor.go b/internal/controller/monitor/monitor.go index 00f3583e..a748b2c1 100644 --- a/internal/controller/monitor/monitor.go +++ b/internal/controller/monitor/monitor.go @@ -32,8 +32,16 @@ type Config struct { Tags []string } +type Job struct { + // The job information. + *api.CommandJob + + // Closed when the job information becomes stale. + StaleCh <-chan struct{} +} + type JobHandler interface { - Create(context.Context, *api.CommandJob) error + Create(context.Context, Job) error } func New(logger *zap.Logger, k8s kubernetes.Interface, cfg Config) (*Monitor, error) { @@ -159,35 +167,54 @@ func (m *Monitor) Start(ctx context.Context, handler JobHandler) <-chan error { return } - jobs := resp.CommandJobs() + m.createJobs(ctx, logger, handler, agentTags, resp.CommandJobs()) + } + }() - // TODO: sort by ScheduledAt in the API - sort.Slice(jobs, func(i, j int) bool { - return jobs[i].ScheduledAt.Before(jobs[j].ScheduledAt) - }) + return errs +} - for _, job := range jobs { - jobTags := toMapAndLogErrors(logger, job.AgentQueryRules) +func (m *Monitor) createJobs(ctx context.Context, logger *zap.Logger, handler JobHandler, agentTags map[string]string, jobs []*api.JobJobTypeCommand) { + // A sneaky way to create a channel that is closed after a duration. + // Why not pass directly to handler.Create? Because that might + // interrupt scheduling a pod, when all we want is to bound the + // time spent waiting for the limiter. + staleCtx, staleCancel := context.WithTimeout(ctx, m.cfg.PollInterval) + defer staleCancel() + + // TODO: sort by ScheduledAt in the API + sort.Slice(jobs, func(i, j int) bool { + return jobs[i].ScheduledAt.Before(jobs[j].ScheduledAt) + }) + + for _, j := range jobs { + if staleCtx.Err() != nil { + // Results already stale; try again later. + return + } - // The api returns jobs that match ANY agent tags (the agent query rules) - // However, we can only acquire jobs that match ALL agent tags - if !agenttags.JobTagsMatchAgentTags(jobTags, agentTags) { - logger.Debug("skipping job because it did not match all tags", zap.Any("job", job)) - continue - } + jobTags := toMapAndLogErrors(logger, j.AgentQueryRules) - logger.Debug("creating job", zap.String("uuid", job.Uuid)) - if err := handler.Create(ctx, &job.CommandJob); err != nil { - if ctx.Err() != nil { - return - } - logger.Error("failed to create job", zap.Error(err)) - } - } + // The api returns jobs that match ANY agent tags (the agent query rules) + // However, we can only acquire jobs that match ALL agent tags + if !agenttags.JobTagsMatchAgentTags(jobTags, agentTags) { + logger.Debug("skipping job because it did not match all tags", zap.Any("job", j)) + continue } - }() - return errs + job := Job{ + CommandJob: &j.CommandJob, + StaleCh: staleCtx.Done(), + } + + logger.Debug("creating job", zap.String("uuid", j.Uuid)) + if err := handler.Create(ctx, job); err != nil { + if ctx.Err() != nil { + return + } + logger.Error("failed to create job", zap.Error(err)) + } + } } func encodeClusterGraphQLID(clusterUUID string) string { diff --git a/internal/controller/scheduler/limiter.go b/internal/controller/scheduler/limiter.go index c16e240c..5c04ba36 100644 --- a/internal/controller/scheduler/limiter.go +++ b/internal/controller/scheduler/limiter.go @@ -5,7 +5,6 @@ import ( "fmt" "sync" - "github.com/buildkite/agent-stack-k8s/v2/api" "github.com/buildkite/agent-stack-k8s/v2/internal/controller/config" "github.com/buildkite/agent-stack-k8s/v2/internal/controller/monitor" @@ -78,7 +77,7 @@ func (l *MaxInFlightLimiter) RegisterInformer(ctx context.Context, factory infor // Create either creates the job immediately, or blocks until there is capacity. // It may also ignore the job if it is already in flight. -func (l *MaxInFlightLimiter) Create(ctx context.Context, job *api.CommandJob) error { +func (l *MaxInFlightLimiter) Create(ctx context.Context, job monitor.Job) error { uuid, err := uuid.Parse(job.Uuid) if err != nil { l.logger.Error("invalid UUID in CommandJob", zap.Error(err)) @@ -93,7 +92,8 @@ func (l *MaxInFlightLimiter) Create(ctx context.Context, job *api.CommandJob) er return nil } - // Block until there's a token in the bucket. + // Block until there's a token in the bucket, or cancel if the job + // information becomes too stale. select { case <-ctx.Done(): return context.Cause(ctx) @@ -103,6 +103,16 @@ func (l *MaxInFlightLimiter) Create(ctx context.Context, job *api.CommandJob) er zap.String("uuid", uuid.String()), zap.Int("available-tokens", len(l.tokenBucket)), ) + + case <-job.StaleCh: + // The job is _not_ about to be in-flight. + numInFlight, _ := l.casa(uuid, false) + l.logger.Debug("Create: timed out waiting for token", + zap.String("uuid", uuid.String()), + zap.Int("num-in-flight", numInFlight), + zap.Int("available-tokens", len(l.tokenBucket)), + ) + return nil } // We got a token from the bucket above! Proceed to schedule the pod. diff --git a/internal/controller/scheduler/limiter_test.go b/internal/controller/scheduler/limiter_test.go index 39262d0e..82ef4fb2 100644 --- a/internal/controller/scheduler/limiter_test.go +++ b/internal/controller/scheduler/limiter_test.go @@ -10,6 +10,7 @@ import ( "github.com/buildkite/agent-stack-k8s/v2/api" "github.com/buildkite/agent-stack-k8s/v2/internal/controller/config" + "github.com/buildkite/agent-stack-k8s/v2/internal/controller/monitor" "github.com/buildkite/agent-stack-k8s/v2/internal/controller/scheduler" "github.com/google/uuid" @@ -36,7 +37,7 @@ type fakeScheduler struct { errors int } -func (f *fakeScheduler) Create(_ context.Context, job *api.CommandJob) error { +func (f *fakeScheduler) Create(_ context.Context, job monitor.Job) error { f.mu.Lock() defer f.mu.Unlock() if f.err != nil { @@ -103,7 +104,7 @@ func TestLimiter(t *testing.T) { for range 50 { go func() { defer wg.Done() - err := limiter.Create(ctx, &api.CommandJob{Uuid: uuid.New().String()}) + err := limiter.Create(ctx, monitor.Job{CommandJob: &api.CommandJob{Uuid: uuid.New().String()}}) if err != nil { t.Errorf("limiter.Create(ctx, &job) = %v", err) } @@ -140,7 +141,7 @@ func TestLimiter_SkipsDuplicateJobs(t *testing.T) { uuid := uuid.New().String() for range 50 { - if err := limiter.Create(ctx, &api.CommandJob{Uuid: uuid}); err != nil { + if err := limiter.Create(ctx, monitor.Job{CommandJob: &api.CommandJob{Uuid: uuid}}); err != nil { t.Errorf("limiter.Create(ctx, &job) = %v", err) } } @@ -170,7 +171,7 @@ func TestLimiter_SkipsCreateErrors(t *testing.T) { handler.limiter = limiter for range 50 { - err := limiter.Create(ctx, &api.CommandJob{Uuid: uuid.New().String()}) + err := limiter.Create(ctx, monitor.Job{CommandJob: &api.CommandJob{Uuid: uuid.New().String()}}) if !errors.Is(err, handler.err) { t.Errorf("limiter.Create(ctx, some-job) error = %v, want %v", err, handler.err) } diff --git a/internal/controller/scheduler/scheduler.go b/internal/controller/scheduler/scheduler.go index 065b3bb9..8de641ef 100644 --- a/internal/controller/scheduler/scheduler.go +++ b/internal/controller/scheduler/scheduler.go @@ -14,6 +14,7 @@ import ( "github.com/buildkite/agent-stack-k8s/v2/api" "github.com/buildkite/agent-stack-k8s/v2/internal/controller/agenttags" "github.com/buildkite/agent-stack-k8s/v2/internal/controller/config" + "github.com/buildkite/agent-stack-k8s/v2/internal/controller/monitor" "github.com/buildkite/agent-stack-k8s/v2/internal/version" "github.com/buildkite/agent/v3/clicommand" @@ -85,11 +86,11 @@ type worker struct { logger *zap.Logger } -func (w *worker) Create(ctx context.Context, job *api.CommandJob) error { +func (w *worker) Create(ctx context.Context, job monitor.Job) error { logger := w.logger.With(zap.String("uuid", job.Uuid)) logger.Info("creating job") - inputs, err := w.ParseJob(job) + inputs, err := w.ParseJob(job.CommandJob) if err != nil { logger.Warn("Job parsing failed, failing job", zap.Error(err)) return w.failJob(ctx, inputs, fmt.Sprintf("agent-stack-k8s failed to parse the job: %v", err))