Skip to content

Commit

Permalink
Use init containers to early-check image pull
Browse files Browse the repository at this point in the history
  • Loading branch information
DrJosh9000 committed Aug 8, 2024
1 parent 17ae5f3 commit bfb9471
Show file tree
Hide file tree
Showing 8 changed files with 249 additions and 70 deletions.
22 changes: 17 additions & 5 deletions charts/agent-stack-k8s/templates/rbac.yaml.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,25 @@ rules:
- create
- update
- apiGroups:
- ""
- ""
resources:
- pods
- pods
verbs:
- get
- list
- watch
- get
- list
- watch
- apiGroups:
- ""
resources:
- secrets
verbs:
- get
- apiGroups:
- ""
resources:
- pod/eviction
verbs:
- create
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
Expand Down
3 changes: 2 additions & 1 deletion internal/controller/scheduler/fail_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ func failJob(
zapLogger *zap.Logger,
agentToken string,
jobUUID string,
tags []string,
message string,
) error {
ctr, err := agentcore.NewController(
ctx,
agentToken,
kjobName(jobUUID),
nil, // Not bothering with tags, since we're acquiring the job
tags, // queue is required for acquire! maybe more
agentcore.WithUserAgent("agent-stack-k8s/"+version.Version()),
agentcore.WithLogger(logger.NewConsoleLogger(logger.NewTextPrinter(os.Stderr), func(int) {})),
)
Expand Down
96 changes: 87 additions & 9 deletions internal/controller/scheduler/imagePullBackOffWatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,19 @@ package scheduler

import (
"context"
"fmt"
"regexp"
"slices"
"strings"
"time"

"github.com/Khan/genqlient/graphql"
"github.com/buildkite/agent-stack-k8s/v2/api"
"github.com/buildkite/agent-stack-k8s/v2/internal/controller/config"
"github.com/google/uuid"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/tools/cache"
Expand All @@ -21,6 +24,7 @@ type imagePullBackOffWatcher struct {
logger *zap.Logger
k8s kubernetes.Interface
gql graphql.Client
cfg *config.Config

// The imagePullBackOffWatcher waits at least this duration after pod
// creation before it cancels the job.
Expand All @@ -39,6 +43,7 @@ func NewImagePullBackOffWatcher(
logger: logger,
k8s: k8s,
gql: api.NewClient(cfg.BuildkiteToken),
cfg: cfg,
gracePeriod: cfg.ImagePullBackOffGradePeriod,
}
}
Expand All @@ -59,7 +64,7 @@ func (w *imagePullBackOffWatcher) RegisterInformer(
func (w *imagePullBackOffWatcher) OnDelete(obj any) {}

func (w *imagePullBackOffWatcher) OnAdd(maybePod any, isInInitialList bool) {
pod, wasPod := maybePod.(*v1.Pod)
pod, wasPod := maybePod.(*corev1.Pod)
if !wasPod {
return
}
Expand All @@ -68,8 +73,8 @@ func (w *imagePullBackOffWatcher) OnAdd(maybePod any, isInInitialList bool) {
}

func (w *imagePullBackOffWatcher) OnUpdate(oldMaybePod, newMaybePod any) {
oldPod, oldWasPod := newMaybePod.(*v1.Pod)
newPod, newWasPod := newMaybePod.(*v1.Pod)
oldPod, oldWasPod := newMaybePod.(*corev1.Pod)
newPod, newWasPod := newMaybePod.(*corev1.Pod)

// This nonsense statement is only necessary because the types are too loose.
// Most likely both old and new are going to be Pods.
Expand All @@ -80,7 +85,7 @@ func (w *imagePullBackOffWatcher) OnUpdate(oldMaybePod, newMaybePod any) {
}
}

func (w *imagePullBackOffWatcher) cancelImagePullBackOff(ctx context.Context, pod *v1.Pod) {
func (w *imagePullBackOffWatcher) cancelImagePullBackOff(ctx context.Context, pod *corev1.Pod) {
log := w.logger.With(zap.String("namespace", pod.Namespace), zap.String("podName", pod.Name))
log.Debug("Checking pod for ImagePullBackOff")

Expand Down Expand Up @@ -109,16 +114,85 @@ func (w *imagePullBackOffWatcher) cancelImagePullBackOff(ctx context.Context, po

log = log.With(zap.String("jobUUID", jobUUID.String()))

const (
outcomeNothing = iota
outcomeFail
outcomeCancel
)
outcome := outcomeNothing
images := make(map[string]struct{})

for _, containerStatus := range pod.Status.InitContainerStatuses {
if !shouldCancel(&containerStatus) {
continue
}
if !isImagePullCheckContainer(&containerStatus) {
log.Info("Ignoring init container during ImagePullBackOff watch.", zap.String("name", containerStatus.Name))
continue
}
outcome = max(outcome, outcomeFail)
images[containerStatus.Image] = struct{}{}
}

// These containers only run after the init containers have run.
for _, containerStatus := range pod.Status.ContainerStatuses {
if !shouldCancel(&containerStatus) {
continue
}
if !isSystemContainer(&containerStatus) {
log.Info("Ignoring sidecar container during ImagePullBackOff watch.", zap.String("name", containerStatus.Name))
log.Info("Ignoring container during ImagePullBackOff watch.", zap.String("name", containerStatus.Name))
continue
}
outcome = max(outcome, outcomeCancel)
images[containerStatus.Image] = struct{}{}
}

log.Info("Job has a container in ImagePullBackOff. Cancelling.")
switch outcome {
case outcomeFail:
log.Info("One or more image pull check containers are in ImagePullBackOff. Failing.")
agentToken, err := fetchAgentToken(ctx, w.logger, w.k8s, w.cfg.Namespace, w.cfg.AgentTokenSecret)
if err != nil {
log.Error("Couldn't fetch agent token in order to fail the job", zap.Error(err))
return
}

// Format the failed images into a nice list.
imagesList := make([]string, 0, len(images))
for image := range images {
imagesList = append(imagesList, image)
}
slices.Sort(imagesList)
var message strings.Builder
message.WriteString("The following container images couldn't be pulled:\n")
for _, image := range imagesList {
fmt.Fprintf(&message, " * %s\n", image)
}

// Need the tags...
var tags []string
for key, value := range pod.Labels {
k, has := strings.CutPrefix(key, "tag.buildkite.com/")
if !has {
continue
}
tags = append(tags, fmt.Sprintf("%s=%s", k, value))
}

if err := failJob(ctx, w.logger, agentToken, jobUUID.String(), tags, message.String()); err != nil {
log.Error("Couldn't fail the job", zap.Error(err))
return
}

// Let's also evict the pod (request graceful termination).
eviction := &policyv1.Eviction{
ObjectMeta: pod.ObjectMeta,
}
if err := w.k8s.PolicyV1().Evictions(w.cfg.Namespace).Evict(ctx, eviction); err != nil {
log.Error("Couldn't evict pod", zap.Error(err))
}

case outcomeCancel:
log.Info("One or more job containers are in ImagePullBackOff. Cancelling.")

resp, err := api.GetCommandJob(ctx, w.gql, jobUUID.String())
if err != nil {
Expand Down Expand Up @@ -149,11 +223,15 @@ func (w *imagePullBackOffWatcher) cancelImagePullBackOff(ctx context.Context, po
}
}

func shouldCancel(containerStatus *v1.ContainerStatus) bool {
func shouldCancel(containerStatus *corev1.ContainerStatus) bool {
return containerStatus.State.Waiting != nil &&
containerStatus.State.Waiting.Reason == "ImagePullBackOff"
}

func isImagePullCheckContainer(containerStatus *corev1.ContainerStatus) bool {
return strings.HasPrefix(containerStatus.Name, ImagePullCheckContainerNamePrefix)
}

// All container-\d containers will have the agent installed as their PID 1.
// Therefore, their lifecycle is well monitored in our backend, allowing us to terminate them if they fail to start.
//
Expand All @@ -164,7 +242,7 @@ func shouldCancel(containerStatus *v1.ContainerStatus) bool {
// Most importantly, the CI can still pass (in theory) even if sidecars fail.
//
// (The name "system container" is subject to more debate.)
func isSystemContainer(containerStatus *v1.ContainerStatus) bool {
func isSystemContainer(containerStatus *corev1.ContainerStatus) bool {
name := containerStatus.Name
if slices.Contains([]string{AgentContainerName, CopyAgentContainerName, CheckoutContainerName}, name) {
return true
Expand Down
Loading

0 comments on commit bfb9471

Please sign in to comment.