Skip to content

Commit

Permalink
update affinity assistant creation implementation
Browse files Browse the repository at this point in the history
Before this commit, the affinity assistant was created in the beginning of
the pipleineRun. And the same affinity assistant was relied on for the
entire lifecycle of a PR. Now, there could be a case when the node
on which affinity assistant pod is created goes down. In this case,
the rest of the pipelineRun is stuck and cannot run to the completition
since the affinity assistant (StatefulSet) tries to schedule rest
of the pods (taskRuns) on the same node but that node is cordoned or not
scheduling anything new.

This commit always makes an attempt to create Affinity Assistant (StatefulSet)
in case it does not exist. If it exist, the controller checks if the node on
which Affinity Assistant pod is created is healthy to schedule subsequent
pods. If not, the controller deletes Affinity Assistant pod so that
StatefulSet can upscale the replicas (set to 1) on other node in the cluster.
  • Loading branch information
Priti Desai committed May 7, 2023
1 parent 77c1698 commit 4dfbf1f
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 11 deletions.
4 changes: 4 additions & 0 deletions config/200-clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ rules:
# Controller needs to watch Pods created by TaskRuns to see them progress.
resources: ["pods"]
verbs: ["list", "watch"]
- apiGroups: [""]
# Controller needs to watch nodes for their health over the course of a single run
resources: ["nodes"]
verbs: ["get", "list", "watch"]
# Controller needs cluster access to all of the CRDs that it is responsible for
# managing.
- apiGroups: ["tekton.dev"]
Expand Down
30 changes: 28 additions & 2 deletions pkg/reconciler/pipelinerun/affinity_assistant.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (c *Reconciler) createAffinityAssistants(ctx context.Context, wb []v1beta1.
for _, w := range wb {
if w.PersistentVolumeClaim != nil || w.VolumeClaimTemplate != nil {
affinityAssistantName := getAffinityAssistantName(w.Name, pr.Name)
_, err := c.KubeClientSet.AppsV1().StatefulSets(namespace).Get(ctx, affinityAssistantName, metav1.GetOptions{})
a, err := c.KubeClientSet.AppsV1().StatefulSets(namespace).Get(ctx, affinityAssistantName, metav1.GetOptions{})
claimName := getClaimName(w, *kmeta.NewControllerRef(pr))
switch {
case apierrors.IsNotFound(err):
Expand All @@ -68,6 +68,32 @@ func (c *Reconciler) createAffinityAssistants(ctx context.Context, wb []v1beta1.
if err == nil {
logger.Infof("Created StatefulSet %s in namespace %s", affinityAssistantName, namespace)
}
case err == nil && a != nil && a.Status.ReadyReplicas == 1:
// the node which hosts the affinity assistant pod is unschedulable or cordoned for some reason,
// delete the affinity assistant pod such that a StatefulSet can recreate the same pod on a different node
// get the list of pods associated with a given StatefulSet based on the following label:
// app.kubernetes.io/instance=affinity-assistant-641c8a0038
pods, err := c.KubeClientSet.CoreV1().Pods("").List(ctx, metav1.ListOptions{
LabelSelector: workspace.LabelInstance + "=" + a.Name,
})
if err != nil || len(pods.Items) == 0 {
return fmt.Errorf("could not get the affinity assistant pod for StatefulSet %s: %w", a.Name, err)
}
// iterate over the list of pods - at most there can be only one pod as the requested replicas is set to 1
for _, pod := range pods.Items {
// get the node based on the affinity assistant pod - pod.Spec.nodeName
n, err := c.KubeClientSet.CoreV1().Nodes().Get(ctx, pod.Spec.NodeName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("could not get the node \"%s\" on which affinity assistant pod is created, err: %w", pod.Spec.NodeName, err)
}
// delete the affinity assistant pod if the node is cordoned or unschedulable anymore
if n.Spec.Unschedulable {
err = c.KubeClientSet.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("error deleting affinity assistant pod %s in ns %s: %w", pod.Name, pod.Namespace, err)
}
}
}
case err != nil:
errs = append(errs, fmt.Errorf("failed to retrieve StatefulSet %s: %w", affinityAssistantName, err))
}
Expand Down Expand Up @@ -107,7 +133,7 @@ func (c *Reconciler) cleanupAffinityAssistants(ctx context.Context, pr *v1beta1.
func getAffinityAssistantName(pipelineWorkspaceName string, pipelineRunName string) string {
hashBytes := sha256.Sum256([]byte(pipelineWorkspaceName + pipelineRunName))
hashString := fmt.Sprintf("%x", hashBytes)
return fmt.Sprintf("%s-%s", "affinity-assistant", hashString[:10])
return fmt.Sprintf("%s-%s", workspace.ComponentNameAffinityAssistant, hashString[:10])
}

func getStatefulSetLabels(pr *v1beta1.PipelineRun, affinityAssistantName string) map[string]string {
Expand Down
127 changes: 127 additions & 0 deletions pkg/reconciler/pipelinerun/affinity_assistant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package pipelinerun

import (
"context"
"encoding/json"
"testing"

"github.com/google/go-cmp/cmp"
Expand All @@ -28,9 +29,11 @@ import (
"github.com/tektoncd/pipeline/pkg/workspace"
"github.com/tektoncd/pipeline/test/diff"
"github.com/tektoncd/pipeline/test/parse"
"gomodules.xyz/jsonpatch/v2"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
fakek8s "k8s.io/client-go/kubernetes/fake"
logtesting "knative.dev/pkg/logging/testing"
"knative.dev/pkg/system"
Expand Down Expand Up @@ -88,6 +91,130 @@ func TestCreateAndDeleteOfAffinityAssistant(t *testing.T) {
}
}

// TestCreateAndDeleteOfAffinityAssistant tests to create and delete an Affinity Assistant
// for a given PipelineRun with a PVC workspace
func TestCreateAffinityAssistantWhenNodeIsCordoned(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()

c := Reconciler{
KubeClientSet: fakek8s.NewSimpleClientset(),
Images: pipeline.Images{},
}

workspaceName := "testws"
pipelineRunName := "pipelinerun-1"
testPipelineRun := &v1beta1.PipelineRun{
TypeMeta: metav1.TypeMeta{Kind: "PipelineRun"},
ObjectMeta: metav1.ObjectMeta{
Name: pipelineRunName,
},
Spec: v1beta1.PipelineRunSpec{
Workspaces: []v1beta1.WorkspaceBinding{{
Name: workspaceName,
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: "myclaim",
},
}},
},
}

// create affinity assistant in a cluster
err := c.createAffinityAssistants(ctx, testPipelineRun.Spec.Workspaces, testPipelineRun, testPipelineRun.Namespace)
if err != nil {
t.Errorf("unexpected error from createAffinityAssistants: %v", err)
}

// get the affinity assistant just created
expectedAffinityAssistantName := getAffinityAssistantName(workspaceName, testPipelineRun.Name)

// assume the necessary pod is created, update the affinity assistant with the number of pods in readyReplicas
var replaceReplicasPatchBytes []byte
replaceReplicasPatchBytes, err = json.Marshal([]jsonpatch.JsonPatchOperation{{
Operation: "replace",
Path: "/status/readyReplicas",
Value: int32(1),
}})
_, err = c.KubeClientSet.AppsV1().StatefulSets(testPipelineRun.Namespace).Patch(ctx, expectedAffinityAssistantName, types.JSONPatchType, replaceReplicasPatchBytes, metav1.PatchOptions{})
if err != nil {
t.Errorf("unexpected error updating StatefulSet: %v", err)
}

// register cordoned node to the cluster
c.KubeClientSet.CoreV1().Nodes().Create(ctx, &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "soon-to-be-cordoned-node",
},
Spec: corev1.NodeSpec{
Unschedulable: true,
},
}, metav1.CreateOptions{})

// now, createAffinityAssistants should be able to catch the missing affinity assistant pod with an existing StatefulSet and unschedulable node
err = c.createAffinityAssistants(ctx, testPipelineRun.Spec.Workspaces, testPipelineRun, testPipelineRun.Namespace)
if err == nil {
t.Error("expected error from createAffinityAssistants, but no error was reported")
}

p := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: expectedAffinityAssistantName + "-0",
Labels: map[string]string{
workspace.LabelComponent: workspace.ComponentNameAffinityAssistant,
workspace.LabelInstance: expectedAffinityAssistantName,
"statefulset.kubernetes.io/pod-name": expectedAffinityAssistantName + "-0",
},
OwnerReferences: []metav1.OwnerReference{{
APIVersion: "apps/v1",
Kind: "StatefulSet",
Name: expectedAffinityAssistantName,
}},
},
}

// create an affinity-assistant pod with necessary labels
_, err = c.KubeClientSet.CoreV1().Pods(testPipelineRun.Namespace).Create(ctx, &corev1.Pod{
ObjectMeta: p.ObjectMeta,
Spec: corev1.PodSpec{
NodeName: "soon-to-be-cordoned-node",
},
}, metav1.CreateOptions{})
if err != nil {
t.Errorf("unexpected error from creating a pod for AffinityAssistant: %v", err)
}

// now, try to create affinity assistant again with an existing StatefulSet and unschedulable node
err = c.createAffinityAssistants(ctx, testPipelineRun.Spec.Workspaces, testPipelineRun, testPipelineRun.Namespace)
if err != nil {
t.Errorf("unexpected error from createAffinityAssistants: %v", err)
}

// the affinity assistant pod must be deleted with an affinity assistant running on a cordoned node
_, err = c.KubeClientSet.CoreV1().Pods(testPipelineRun.Namespace).Get(ctx, expectedAffinityAssistantName+"-0", metav1.GetOptions{})
if !apierrors.IsNotFound(err) {
t.Errorf("expected a NotFound response, got: %v", err)
}

// now, create an affinity assistant pod with a missing node
// create an affinity-assistant pod with necessary labels
_, err = c.KubeClientSet.CoreV1().Pods(testPipelineRun.Namespace).Create(ctx, &corev1.Pod{
ObjectMeta: p.ObjectMeta,
Spec: corev1.PodSpec{
NodeName: "missing-node",
},
}, metav1.CreateOptions{})
if err != nil {
t.Errorf("unexpected error from creating a pod for AffinityAssistant: %v", err)
}

// now, try to create affinity assistant again with an existing StatefulSet and missing node
err = c.createAffinityAssistants(ctx, testPipelineRun.Spec.Workspaces, testPipelineRun, testPipelineRun.Namespace)
if err == nil {
t.Error("expected error from createAffinityAssistants, but no error was reported")
}
}

func TestPipelineRunPodTemplatesArePropagatedToAffinityAssistant(t *testing.T) {
prWithCustomPodTemplate := &v1beta1.PipelineRun{
TypeMeta: metav1.TypeMeta{Kind: "PipelineRun"},
Expand Down
20 changes: 11 additions & 9 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,16 +609,18 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun, get
return controller.NewPermanentError(err)
}
}
}

if !c.isAffinityAssistantDisabled(ctx) {
// create Affinity Assistant (StatefulSet) so that taskRun pods that share workspace PVC achieve Node Affinity
if err = c.createAffinityAssistants(ctx, pr.Spec.Workspaces, pr, pr.Namespace); err != nil {
logger.Errorf("Failed to create affinity assistant StatefulSet for PipelineRun %s: %v", pr.Name, err)
pr.Status.MarkFailed(ReasonCouldntCreateAffinityAssistantStatefulSet,
"Failed to create StatefulSet for PipelineRun %s/%s correctly: %s",
pr.Namespace, pr.Name, err)
return controller.NewPermanentError(err)
}
// Make an attempt to create Affinity Assistant if it does not exist
// if the Affinity Assistant already exists, handle the possibility of assigned node going down by deleting the pod
if !c.isAffinityAssistantDisabled(ctx) {
// create Affinity Assistant (StatefulSet) so that taskRun pods that share workspace PVC achieve Node Affinity
if err = c.createAffinityAssistants(ctx, pr.Spec.Workspaces, pr, pr.Namespace); err != nil {
logger.Errorf("Failed to create affinity assistant StatefulSet for PipelineRun %s: %v", pr.Name, err)
pr.Status.MarkFailed(ReasonCouldntCreateAffinityAssistantStatefulSet,
"Failed to create StatefulSet for PipelineRun %s/%s correctly: %s",
pr.Namespace, pr.Name, err)
return controller.NewPermanentError(err)
}
}

Expand Down

0 comments on commit 4dfbf1f

Please sign in to comment.