Skip to content

Commit

Permalink
Use init containers to early-check image pull
Browse files Browse the repository at this point in the history
  • Loading branch information
DrJosh9000 committed Aug 13, 2024
1 parent 17ae5f3 commit 0c76f91
Show file tree
Hide file tree
Showing 8 changed files with 331 additions and 96 deletions.
22 changes: 17 additions & 5 deletions charts/agent-stack-k8s/templates/rbac.yaml.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,25 @@ rules:
- create
- update
- apiGroups:
- ""
- ""
resources:
- pods
- pods
verbs:
- get
- list
- watch
- get
- list
- watch
- apiGroups:
- ""
resources:
- secrets
verbs:
- get
- apiGroups:
- ""
resources:
- pods/eviction
verbs:
- create
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
Expand Down
3 changes: 2 additions & 1 deletion internal/controller/scheduler/fail_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ func failJob(
zapLogger *zap.Logger,
agentToken string,
jobUUID string,
tags []string,
message string,
) error {
ctr, err := agentcore.NewController(
ctx,
agentToken,
kjobName(jobUUID),
nil, // Not bothering with tags, since we're acquiring the job
tags, // queue is required for acquire! maybe more
agentcore.WithUserAgent("agent-stack-k8s/"+version.Version()),
agentcore.WithLogger(logger.NewConsoleLogger(logger.NewTextPrinter(os.Stderr), func(int) {})),
)
Expand Down
198 changes: 163 additions & 35 deletions internal/controller/scheduler/imagePullBackOffWatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,23 @@ package scheduler

import (
"context"
"errors"
"fmt"
"regexp"
"slices"
"strings"
"time"

"github.com/Khan/genqlient/graphql"
"github.com/buildkite/agent-stack-k8s/v2/api"
"github.com/buildkite/agent-stack-k8s/v2/internal/controller/config"

agentcore "github.com/buildkite/agent/v3/core"

"github.com/Khan/genqlient/graphql"
"github.com/google/uuid"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/tools/cache"
Expand All @@ -21,10 +28,15 @@ type imagePullBackOffWatcher struct {
logger *zap.Logger
k8s kubernetes.Interface
gql graphql.Client
cfg *config.Config

// The imagePullBackOffWatcher waits at least this duration after pod
// creation before it cancels the job.
gracePeriod time.Duration

// Jobs that we've failed, cancelled, or were found to be in a terminal
// state.
ignoreJobs map[uuid.UUID]struct{}
}

// NewImagePullBackOffWatcher creates an informer that will use the Buildkite
Expand All @@ -39,7 +51,9 @@ func NewImagePullBackOffWatcher(
logger: logger,
k8s: k8s,
gql: api.NewClient(cfg.BuildkiteToken),
cfg: cfg,
gracePeriod: cfg.ImagePullBackOffGradePeriod,
ignoreJobs: make(map[uuid.UUID]struct{}),
}
}

Expand All @@ -59,7 +73,7 @@ func (w *imagePullBackOffWatcher) RegisterInformer(
func (w *imagePullBackOffWatcher) OnDelete(obj any) {}

func (w *imagePullBackOffWatcher) OnAdd(maybePod any, isInInitialList bool) {
pod, wasPod := maybePod.(*v1.Pod)
pod, wasPod := maybePod.(*corev1.Pod)
if !wasPod {
return
}
Expand All @@ -68,8 +82,8 @@ func (w *imagePullBackOffWatcher) OnAdd(maybePod any, isInInitialList bool) {
}

func (w *imagePullBackOffWatcher) OnUpdate(oldMaybePod, newMaybePod any) {
oldPod, oldWasPod := newMaybePod.(*v1.Pod)
newPod, newWasPod := newMaybePod.(*v1.Pod)
oldPod, oldWasPod := newMaybePod.(*corev1.Pod)
newPod, newWasPod := newMaybePod.(*corev1.Pod)

// This nonsense statement is only necessary because the types are too loose.
// Most likely both old and new are going to be Pods.
Expand All @@ -80,7 +94,7 @@ func (w *imagePullBackOffWatcher) OnUpdate(oldMaybePod, newMaybePod any) {
}
}

func (w *imagePullBackOffWatcher) cancelImagePullBackOff(ctx context.Context, pod *v1.Pod) {
func (w *imagePullBackOffWatcher) cancelImagePullBackOff(ctx context.Context, pod *corev1.Pod) {
log := w.logger.With(zap.String("namespace", pod.Namespace), zap.String("podName", pod.Name))
log.Debug("Checking pod for ImagePullBackOff")

Expand All @@ -94,8 +108,7 @@ func (w *imagePullBackOffWatcher) cancelImagePullBackOff(ctx context.Context, po
return
}

clientMutationId := pod.GetName()
rawJobUUID, exists := pod.GetLabels()[config.UUIDLabel]
rawJobUUID, exists := pod.Labels[config.UUIDLabel]
if !exists {
log.Info("Job UUID label not present. Skipping.")
return
Expand All @@ -109,47 +122,162 @@ func (w *imagePullBackOffWatcher) cancelImagePullBackOff(ctx context.Context, po

log = log.With(zap.String("jobUUID", jobUUID.String()))

if _, done := w.ignoreJobs[jobUUID]; done {
log.Debug("Job already failed, canceled, or wasn't in a failable/cancellable state")
return
}

images := make(map[string]struct{})

// If any init container fails to pull, whether it's one we added
// specifically to check for pull failure, the pod won't run.
for _, containerStatus := range pod.Status.InitContainerStatuses {
if !shouldCancel(&containerStatus) {
continue
}
images[containerStatus.Image] = struct{}{}
}

// These containers only run after the init containers have run.
// Theoretically this could still happen even if all the init containers
// successfully pulled.
for _, containerStatus := range pod.Status.ContainerStatuses {
if !shouldCancel(&containerStatus) {
continue
}
if !isSystemContainer(&containerStatus) {
log.Info("Ignoring sidecar container during ImagePullBackOff watch.", zap.String("name", containerStatus.Name))
log.Info("Ignoring container during ImagePullBackOff watch.", zap.String("name", containerStatus.Name))
continue
}
images[containerStatus.Image] = struct{}{}
}

if len(images) == 0 {
// All's well with the world.
return
}

// Get the current job state from BK.
// What we do next depends on what state it is in.
resp, err := api.GetCommandJob(ctx, w.gql, jobUUID.String())
if err != nil {
log.Warn("Failed to query command job", zap.Error(err))
return
}
job, ok := resp.Job.(*api.GetCommandJobJobJobTypeCommand)
if !ok {
log.Warn("Job was not a command job")
return
}

log = log.With(zap.String("job_state", string(job.State)))

switch job.State {
case api.JobStatesScheduled:
// We can acquire it and fail it ourselves.
log.Info("One or more job containers are in ImagePullBackOff. Failing.")
w.failJob(ctx, log, pod, jobUUID, images)

log.Info("Job has a container in ImagePullBackOff. Cancelling.")
case api.JobStatesAccepted, api.JobStatesAssigned, api.JobStatesRunning:
// An agent is already doing something with the job - now canceling
// is the only lever available.
log.Info("One or more job containers are in ImagePullBackOff. Cancelling.")
w.cancelJob(ctx, log, pod, jobUUID)

resp, err := api.GetCommandJob(ctx, w.gql, jobUUID.String())
if err != nil {
log.Warn("Failed to query command job", zap.Error(err))
return
case api.JobStatesCanceling, api.JobStatesCanceled, api.JobStatesFinished, api.JobStatesSkipped:
// If the job is in one of these states, we can neither acquire nor
// cancel it (now or in the future).
log.Debug("Job not acquirable or cancelable")
w.ignoreJobs[jobUUID] = struct{}{}

default:
// Most states don't make sense for a command job that we've started
// a pod for (e.g. blocked, broken, expired, pending, waiting, ...)
// Maybe the meanings of states has changed since this build?
// Log a message but don't do anything.
log.Warn("Job not in actionable state")
}
}

func (w *imagePullBackOffWatcher) failJob(ctx context.Context, log *zap.Logger, pod *corev1.Pod, jobUUID uuid.UUID, images map[string]struct{}) {
agentToken, err := fetchAgentToken(ctx, w.logger, w.k8s, w.cfg.Namespace, w.cfg.AgentTokenSecret)
if err != nil {
log.Error("Couldn't fetch agent token in order to fail the job", zap.Error(err))
return
}

// Format the failed images into a nice list.
imagesList := make([]string, 0, len(images))
for image := range images {
imagesList = append(imagesList, image)
}
slices.Sort(imagesList)
var message strings.Builder
message.WriteString("The following container images couldn't be pulled:\n")
for _, image := range imagesList {
fmt.Fprintf(&message, " * %s\n", image)
}

// Tags are required order to connect the agent.
var tags []string
for key, value := range pod.Labels {
k, has := strings.CutPrefix(key, "tag.buildkite.com/")
if !has {
continue
}
tags = append(tags, fmt.Sprintf("%s=%s", k, value))
}

switch job := resp.GetJob().(type) {
case *api.GetCommandJobJobJobTypeCommand:
// This is expected as there will be a gap between when cancel request completes and
// the Kubernetes job is cleaned up, during which more pods with containers destined to
// ImagePullBackOff may be created.
if job.GetState() == api.JobStatesCanceled || job.GetState() == api.JobStatesCanceling {
return
}

if _, err := api.CancelCommandJob(ctx, w.gql, api.JobTypeCommandCancelInput{
ClientMutationId: clientMutationId,
Id: job.GetId(),
}); err != nil {
log.Warn("Failed to cancel command job", zap.Error(err), zap.String("state", string(job.GetState())))
}
return
default:
log.Warn("Job was not a command job")
return
if err := failJob(ctx, w.logger, agentToken, jobUUID.String(), tags, message.String()); err != nil {
log.Error("Couldn't fail the job", zap.Error(err))
// If the error was because BK rejected the acquisition, then its moved
// on to a state where we need to cancel instead.
if errors.Is(err, agentcore.ErrJobAcquisitionRejected) {
log.Info("Attempting to cancel job instead")
w.cancelJob(ctx, log, pod, jobUUID)
}
return
}

// Let's also evict the pod (request graceful termination).
eviction := &policyv1.Eviction{
ObjectMeta: pod.ObjectMeta,
}
if err := w.k8s.PolicyV1().Evictions(w.cfg.Namespace).Evict(ctx, eviction); err != nil {
log.Error("Couldn't evict pod", zap.Error(err))
}

// Because eviction isn't instantaneous, the pod can continue to exist
// for a bit. Record that we've failed the job to avoid trying to fail
// it again.
w.ignoreJobs[jobUUID] = struct{}{}
}

func (w *imagePullBackOffWatcher) cancelJob(ctx context.Context, log *zap.Logger, pod *corev1.Pod, jobUUID uuid.UUID) {
_, err := api.CancelCommandJob(ctx, w.gql, api.JobTypeCommandCancelInput{
ClientMutationId: pod.Name,
Id: jobUUID.String(),
})
if err != nil {
log.Warn("Failed to cancel command job", zap.Error(err))
// Could be network problems
// Could be in non-cancelable state
// Try again later?
return
}

// Evicting the pod might prevent the agent from logging its last-gasp
// "it could be ImagePullBackOff" message.
// On the other hand, not evicting the pod will probably leave it running
// indefinitely if there are any sidecars.
// TODO: experiment with adding eviction here.

// We can avoid repeating the GraphQL queries to fetch and cancel the job
// (between cancelling and Kubernetes cleaning up the pod) if we got here.
w.ignoreJobs[jobUUID] = struct{}{}
}

func shouldCancel(containerStatus *v1.ContainerStatus) bool {
func shouldCancel(containerStatus *corev1.ContainerStatus) bool {
return containerStatus.State.Waiting != nil &&
containerStatus.State.Waiting.Reason == "ImagePullBackOff"
}
Expand All @@ -164,7 +292,7 @@ func shouldCancel(containerStatus *v1.ContainerStatus) bool {
// Most importantly, the CI can still pass (in theory) even if sidecars fail.
//
// (The name "system container" is subject to more debate.)
func isSystemContainer(containerStatus *v1.ContainerStatus) bool {
func isSystemContainer(containerStatus *corev1.ContainerStatus) bool {
name := containerStatus.Name
if slices.Contains([]string{AgentContainerName, CopyAgentContainerName, CheckoutContainerName}, name) {
return true
Expand Down
Loading

0 comments on commit 0c76f91

Please sign in to comment.