Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't schedule using stale job info #389

Merged
merged 3 commits into from
Oct 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 51 additions & 24 deletions internal/controller/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,16 @@ type Config struct {
Tags []string
}

type Job struct {
// The job information.
*api.CommandJob

// Closed when the job information becomes stale.
StaleCh <-chan struct{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I thought the fix would be to double-check the cancellation/etc state of the Job/Build before actually scheduling it onto K8S

Having a callback (Channel) when information becomes stale seems like a much more complicated way to achieve that with unclear benefits and requires running a polling loop to keep the channel updated which will load up Buildkite backend quite a lot with these polling checks…

Sorry I'm not very proficient with Golang, can you please help me understand why this approach is better? Thanks!

Copy link
Contributor Author

@DrJosh9000 DrJosh9000 Oct 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there was no limiter in place then Buildkite would receive a query every PollInterval for jobs to run (call this the query loop), and monitor would loop over the result (the inner loop), calling handler.Create for each of them.

In the previous PR on the limiter, I changed how the limiter worked to introduce blocking on the "token bucket", but this causes the bug: a job that was returned from the query a long time ago could be next in the inner loop in the Monitor around handler.Create, so that when the limiter can finally get a token, the job could now be cancelled.

With the StaleCh approach, the limiter can block until either there's a token, or the job information is stale. If it's stale, then the limiter shouldn't pass the job onto the scheduler and it can return early. Then the query loop can wait until the next PollInterval to run the main query - no extra query is needed.

To double-check the cancellation state of the job before passing it on to the scheduler, or double-checking within scheduler, would mean making another query to Buildkite at that point.

So while I think my approach doesn't look particularly clean (I could probably make it look nicer), it does avoid the extra query.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, so the idea is to keep the query loop as the main source of truth for up-to-date data and avoid creating a separate code path for explicit job state refresh? Sounds reasonable!

I guess coming from Reactive Streams I'd expect one Channel to be enough to either provide a valid object or indicate cancellation/stale by closing it, The "query loop" logic would then be distributing the updates about jobs to the interested channels or closing those channels when job is cancelled.

Your approach seems to solve that so there is no issue, just speaking out loud :)

}

type JobHandler interface {
Create(context.Context, *api.CommandJob) error
Create(context.Context, Job) error
}

func New(logger *zap.Logger, k8s kubernetes.Interface, cfg Config) (*Monitor, error) {
Expand Down Expand Up @@ -159,35 +167,54 @@ func (m *Monitor) Start(ctx context.Context, handler JobHandler) <-chan error {
return
}

jobs := resp.CommandJobs()
m.createJobs(ctx, logger, handler, agentTags, resp.CommandJobs())
}
}()

// TODO: sort by ScheduledAt in the API
sort.Slice(jobs, func(i, j int) bool {
return jobs[i].ScheduledAt.Before(jobs[j].ScheduledAt)
})
return errs
}

for _, job := range jobs {
jobTags := toMapAndLogErrors(logger, job.AgentQueryRules)
func (m *Monitor) createJobs(ctx context.Context, logger *zap.Logger, handler JobHandler, agentTags map[string]string, jobs []*api.JobJobTypeCommand) {
// A sneaky way to create a channel that is closed after a duration.
// Why not pass directly to handler.Create? Because that might
// interrupt scheduling a pod, when all we want is to bound the
// time spent waiting for the limiter.
staleCtx, staleCancel := context.WithTimeout(ctx, m.cfg.PollInterval)
defer staleCancel()

// TODO: sort by ScheduledAt in the API
sort.Slice(jobs, func(i, j int) bool {
return jobs[i].ScheduledAt.Before(jobs[j].ScheduledAt)
})

for _, j := range jobs {
if staleCtx.Err() != nil {
// Results already stale; try again later.
return
}

// The api returns jobs that match ANY agent tags (the agent query rules)
// However, we can only acquire jobs that match ALL agent tags
if !agenttags.JobTagsMatchAgentTags(jobTags, agentTags) {
logger.Debug("skipping job because it did not match all tags", zap.Any("job", job))
continue
}
jobTags := toMapAndLogErrors(logger, j.AgentQueryRules)

logger.Debug("creating job", zap.String("uuid", job.Uuid))
if err := handler.Create(ctx, &job.CommandJob); err != nil {
if ctx.Err() != nil {
return
}
logger.Error("failed to create job", zap.Error(err))
}
}
// The api returns jobs that match ANY agent tags (the agent query rules)
// However, we can only acquire jobs that match ALL agent tags
if !agenttags.JobTagsMatchAgentTags(jobTags, agentTags) {
logger.Debug("skipping job because it did not match all tags", zap.Any("job", j))
continue
}
}()

return errs
job := Job{
CommandJob: &j.CommandJob,
StaleCh: staleCtx.Done(),
}

logger.Debug("creating job", zap.String("uuid", j.Uuid))
if err := handler.Create(ctx, job); err != nil {
if ctx.Err() != nil {
return
}
logger.Error("failed to create job", zap.Error(err))
}
}
}

func encodeClusterGraphQLID(clusterUUID string) string {
Expand Down
16 changes: 13 additions & 3 deletions internal/controller/scheduler/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"sync"

"github.com/buildkite/agent-stack-k8s/v2/api"
"github.com/buildkite/agent-stack-k8s/v2/internal/controller/config"
"github.com/buildkite/agent-stack-k8s/v2/internal/controller/monitor"

Expand Down Expand Up @@ -78,7 +77,7 @@ func (l *MaxInFlightLimiter) RegisterInformer(ctx context.Context, factory infor

// Create either creates the job immediately, or blocks until there is capacity.
// It may also ignore the job if it is already in flight.
func (l *MaxInFlightLimiter) Create(ctx context.Context, job *api.CommandJob) error {
func (l *MaxInFlightLimiter) Create(ctx context.Context, job monitor.Job) error {
uuid, err := uuid.Parse(job.Uuid)
if err != nil {
l.logger.Error("invalid UUID in CommandJob", zap.Error(err))
Expand All @@ -93,7 +92,8 @@ func (l *MaxInFlightLimiter) Create(ctx context.Context, job *api.CommandJob) er
return nil
}

// Block until there's a token in the bucket.
// Block until there's a token in the bucket, or cancel if the job
// information becomes too stale.
select {
case <-ctx.Done():
return context.Cause(ctx)
Expand All @@ -103,6 +103,16 @@ func (l *MaxInFlightLimiter) Create(ctx context.Context, job *api.CommandJob) er
zap.String("uuid", uuid.String()),
zap.Int("available-tokens", len(l.tokenBucket)),
)

case <-job.StaleCh:
// The job is _not_ about to be in-flight.
numInFlight, _ := l.casa(uuid, false)
l.logger.Debug("Create: timed out waiting for token",
zap.String("uuid", uuid.String()),
zap.Int("num-in-flight", numInFlight),
zap.Int("available-tokens", len(l.tokenBucket)),
)
return nil
}

// We got a token from the bucket above! Proceed to schedule the pod.
Expand Down
9 changes: 5 additions & 4 deletions internal/controller/scheduler/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/buildkite/agent-stack-k8s/v2/api"
"github.com/buildkite/agent-stack-k8s/v2/internal/controller/config"
"github.com/buildkite/agent-stack-k8s/v2/internal/controller/monitor"
"github.com/buildkite/agent-stack-k8s/v2/internal/controller/scheduler"

"github.com/google/uuid"
Expand All @@ -36,7 +37,7 @@ type fakeScheduler struct {
errors int
}

func (f *fakeScheduler) Create(_ context.Context, job *api.CommandJob) error {
func (f *fakeScheduler) Create(_ context.Context, job monitor.Job) error {
f.mu.Lock()
defer f.mu.Unlock()
if f.err != nil {
Expand Down Expand Up @@ -103,7 +104,7 @@ func TestLimiter(t *testing.T) {
for range 50 {
go func() {
defer wg.Done()
err := limiter.Create(ctx, &api.CommandJob{Uuid: uuid.New().String()})
err := limiter.Create(ctx, monitor.Job{CommandJob: &api.CommandJob{Uuid: uuid.New().String()}})
if err != nil {
t.Errorf("limiter.Create(ctx, &job) = %v", err)
}
Expand Down Expand Up @@ -140,7 +141,7 @@ func TestLimiter_SkipsDuplicateJobs(t *testing.T) {
uuid := uuid.New().String()

for range 50 {
if err := limiter.Create(ctx, &api.CommandJob{Uuid: uuid}); err != nil {
if err := limiter.Create(ctx, monitor.Job{CommandJob: &api.CommandJob{Uuid: uuid}}); err != nil {
t.Errorf("limiter.Create(ctx, &job) = %v", err)
}
}
Expand Down Expand Up @@ -170,7 +171,7 @@ func TestLimiter_SkipsCreateErrors(t *testing.T) {
handler.limiter = limiter

for range 50 {
err := limiter.Create(ctx, &api.CommandJob{Uuid: uuid.New().String()})
err := limiter.Create(ctx, monitor.Job{CommandJob: &api.CommandJob{Uuid: uuid.New().String()}})
if !errors.Is(err, handler.err) {
t.Errorf("limiter.Create(ctx, some-job) error = %v, want %v", err, handler.err)
}
Expand Down
5 changes: 3 additions & 2 deletions internal/controller/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/buildkite/agent-stack-k8s/v2/api"
"github.com/buildkite/agent-stack-k8s/v2/internal/controller/agenttags"
"github.com/buildkite/agent-stack-k8s/v2/internal/controller/config"
"github.com/buildkite/agent-stack-k8s/v2/internal/controller/monitor"
"github.com/buildkite/agent-stack-k8s/v2/internal/version"

"github.com/buildkite/agent/v3/clicommand"
Expand Down Expand Up @@ -85,11 +86,11 @@ type worker struct {
logger *zap.Logger
}

func (w *worker) Create(ctx context.Context, job *api.CommandJob) error {
func (w *worker) Create(ctx context.Context, job monitor.Job) error {
logger := w.logger.With(zap.String("uuid", job.Uuid))
logger.Info("creating job")

inputs, err := w.ParseJob(job)
inputs, err := w.ParseJob(job.CommandJob)
if err != nil {
logger.Warn("Job parsing failed, failing job", zap.Error(err))
return w.failJob(ctx, inputs, fmt.Sprintf("agent-stack-k8s failed to parse the job: %v", err))
Expand Down