From 4dfbf1fdf8ead0b9036d9e6c2d6044b87a166ebe Mon Sep 17 00:00:00 2001 From: Priti Desai Date: Thu, 27 Apr 2023 22:37:50 -0700 Subject: [PATCH] update affinity assistant creation implementation Before this commit, the affinity assistant was created in the beginning of the pipleineRun. And the same affinity assistant was relied on for the entire lifecycle of a PR. Now, there could be a case when the node on which affinity assistant pod is created goes down. In this case, the rest of the pipelineRun is stuck and cannot run to the completition since the affinity assistant (StatefulSet) tries to schedule rest of the pods (taskRuns) on the same node but that node is cordoned or not scheduling anything new. This commit always makes an attempt to create Affinity Assistant (StatefulSet) in case it does not exist. If it exist, the controller checks if the node on which Affinity Assistant pod is created is healthy to schedule subsequent pods. If not, the controller deletes Affinity Assistant pod so that StatefulSet can upscale the replicas (set to 1) on other node in the cluster. --- config/200-clusterrole.yaml | 4 + .../pipelinerun/affinity_assistant.go | 30 ++++- .../pipelinerun/affinity_assistant_test.go | 127 ++++++++++++++++++ pkg/reconciler/pipelinerun/pipelinerun.go | 20 +-- 4 files changed, 170 insertions(+), 11 deletions(-) diff --git a/config/200-clusterrole.yaml b/config/200-clusterrole.yaml index e95592832ad..87ee01a04f2 100644 --- a/config/200-clusterrole.yaml +++ b/config/200-clusterrole.yaml @@ -25,6 +25,10 @@ rules: # Controller needs to watch Pods created by TaskRuns to see them progress. resources: ["pods"] verbs: ["list", "watch"] + - apiGroups: [""] + # Controller needs to watch nodes for their health over the course of a single run + resources: ["nodes"] + verbs: ["get", "list", "watch"] # Controller needs cluster access to all of the CRDs that it is responsible for # managing. - apiGroups: ["tekton.dev"] diff --git a/pkg/reconciler/pipelinerun/affinity_assistant.go b/pkg/reconciler/pipelinerun/affinity_assistant.go index edcb9c800b5..54f74472e71 100644 --- a/pkg/reconciler/pipelinerun/affinity_assistant.go +++ b/pkg/reconciler/pipelinerun/affinity_assistant.go @@ -56,7 +56,7 @@ func (c *Reconciler) createAffinityAssistants(ctx context.Context, wb []v1beta1. for _, w := range wb { if w.PersistentVolumeClaim != nil || w.VolumeClaimTemplate != nil { affinityAssistantName := getAffinityAssistantName(w.Name, pr.Name) - _, err := c.KubeClientSet.AppsV1().StatefulSets(namespace).Get(ctx, affinityAssistantName, metav1.GetOptions{}) + a, err := c.KubeClientSet.AppsV1().StatefulSets(namespace).Get(ctx, affinityAssistantName, metav1.GetOptions{}) claimName := getClaimName(w, *kmeta.NewControllerRef(pr)) switch { case apierrors.IsNotFound(err): @@ -68,6 +68,32 @@ func (c *Reconciler) createAffinityAssistants(ctx context.Context, wb []v1beta1. if err == nil { logger.Infof("Created StatefulSet %s in namespace %s", affinityAssistantName, namespace) } + case err == nil && a != nil && a.Status.ReadyReplicas == 1: + // the node which hosts the affinity assistant pod is unschedulable or cordoned for some reason, + // delete the affinity assistant pod such that a StatefulSet can recreate the same pod on a different node + // get the list of pods associated with a given StatefulSet based on the following label: + // app.kubernetes.io/instance=affinity-assistant-641c8a0038 + pods, err := c.KubeClientSet.CoreV1().Pods("").List(ctx, metav1.ListOptions{ + LabelSelector: workspace.LabelInstance + "=" + a.Name, + }) + if err != nil || len(pods.Items) == 0 { + return fmt.Errorf("could not get the affinity assistant pod for StatefulSet %s: %w", a.Name, err) + } + // iterate over the list of pods - at most there can be only one pod as the requested replicas is set to 1 + for _, pod := range pods.Items { + // get the node based on the affinity assistant pod - pod.Spec.nodeName + n, err := c.KubeClientSet.CoreV1().Nodes().Get(ctx, pod.Spec.NodeName, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("could not get the node \"%s\" on which affinity assistant pod is created, err: %w", pod.Spec.NodeName, err) + } + // delete the affinity assistant pod if the node is cordoned or unschedulable anymore + if n.Spec.Unschedulable { + err = c.KubeClientSet.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}) + if err != nil { + return fmt.Errorf("error deleting affinity assistant pod %s in ns %s: %w", pod.Name, pod.Namespace, err) + } + } + } case err != nil: errs = append(errs, fmt.Errorf("failed to retrieve StatefulSet %s: %w", affinityAssistantName, err)) } @@ -107,7 +133,7 @@ func (c *Reconciler) cleanupAffinityAssistants(ctx context.Context, pr *v1beta1. func getAffinityAssistantName(pipelineWorkspaceName string, pipelineRunName string) string { hashBytes := sha256.Sum256([]byte(pipelineWorkspaceName + pipelineRunName)) hashString := fmt.Sprintf("%x", hashBytes) - return fmt.Sprintf("%s-%s", "affinity-assistant", hashString[:10]) + return fmt.Sprintf("%s-%s", workspace.ComponentNameAffinityAssistant, hashString[:10]) } func getStatefulSetLabels(pr *v1beta1.PipelineRun, affinityAssistantName string) map[string]string { diff --git a/pkg/reconciler/pipelinerun/affinity_assistant_test.go b/pkg/reconciler/pipelinerun/affinity_assistant_test.go index ff7fb6fad98..bd17ad27b70 100644 --- a/pkg/reconciler/pipelinerun/affinity_assistant_test.go +++ b/pkg/reconciler/pipelinerun/affinity_assistant_test.go @@ -18,6 +18,7 @@ package pipelinerun import ( "context" + "encoding/json" "testing" "github.com/google/go-cmp/cmp" @@ -28,9 +29,11 @@ import ( "github.com/tektoncd/pipeline/pkg/workspace" "github.com/tektoncd/pipeline/test/diff" "github.com/tektoncd/pipeline/test/parse" + "gomodules.xyz/jsonpatch/v2" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" fakek8s "k8s.io/client-go/kubernetes/fake" logtesting "knative.dev/pkg/logging/testing" "knative.dev/pkg/system" @@ -88,6 +91,130 @@ func TestCreateAndDeleteOfAffinityAssistant(t *testing.T) { } } +// TestCreateAndDeleteOfAffinityAssistant tests to create and delete an Affinity Assistant +// for a given PipelineRun with a PVC workspace +func TestCreateAffinityAssistantWhenNodeIsCordoned(t *testing.T) { + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + c := Reconciler{ + KubeClientSet: fakek8s.NewSimpleClientset(), + Images: pipeline.Images{}, + } + + workspaceName := "testws" + pipelineRunName := "pipelinerun-1" + testPipelineRun := &v1beta1.PipelineRun{ + TypeMeta: metav1.TypeMeta{Kind: "PipelineRun"}, + ObjectMeta: metav1.ObjectMeta{ + Name: pipelineRunName, + }, + Spec: v1beta1.PipelineRunSpec{ + Workspaces: []v1beta1.WorkspaceBinding{{ + Name: workspaceName, + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: "myclaim", + }, + }}, + }, + } + + // create affinity assistant in a cluster + err := c.createAffinityAssistants(ctx, testPipelineRun.Spec.Workspaces, testPipelineRun, testPipelineRun.Namespace) + if err != nil { + t.Errorf("unexpected error from createAffinityAssistants: %v", err) + } + + // get the affinity assistant just created + expectedAffinityAssistantName := getAffinityAssistantName(workspaceName, testPipelineRun.Name) + + // assume the necessary pod is created, update the affinity assistant with the number of pods in readyReplicas + var replaceReplicasPatchBytes []byte + replaceReplicasPatchBytes, err = json.Marshal([]jsonpatch.JsonPatchOperation{{ + Operation: "replace", + Path: "/status/readyReplicas", + Value: int32(1), + }}) + _, err = c.KubeClientSet.AppsV1().StatefulSets(testPipelineRun.Namespace).Patch(ctx, expectedAffinityAssistantName, types.JSONPatchType, replaceReplicasPatchBytes, metav1.PatchOptions{}) + if err != nil { + t.Errorf("unexpected error updating StatefulSet: %v", err) + } + + // register cordoned node to the cluster + c.KubeClientSet.CoreV1().Nodes().Create(ctx, &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "soon-to-be-cordoned-node", + }, + Spec: corev1.NodeSpec{ + Unschedulable: true, + }, + }, metav1.CreateOptions{}) + + // now, createAffinityAssistants should be able to catch the missing affinity assistant pod with an existing StatefulSet and unschedulable node + err = c.createAffinityAssistants(ctx, testPipelineRun.Spec.Workspaces, testPipelineRun, testPipelineRun.Namespace) + if err == nil { + t.Error("expected error from createAffinityAssistants, but no error was reported") + } + + p := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: expectedAffinityAssistantName + "-0", + Labels: map[string]string{ + workspace.LabelComponent: workspace.ComponentNameAffinityAssistant, + workspace.LabelInstance: expectedAffinityAssistantName, + "statefulset.kubernetes.io/pod-name": expectedAffinityAssistantName + "-0", + }, + OwnerReferences: []metav1.OwnerReference{{ + APIVersion: "apps/v1", + Kind: "StatefulSet", + Name: expectedAffinityAssistantName, + }}, + }, + } + + // create an affinity-assistant pod with necessary labels + _, err = c.KubeClientSet.CoreV1().Pods(testPipelineRun.Namespace).Create(ctx, &corev1.Pod{ + ObjectMeta: p.ObjectMeta, + Spec: corev1.PodSpec{ + NodeName: "soon-to-be-cordoned-node", + }, + }, metav1.CreateOptions{}) + if err != nil { + t.Errorf("unexpected error from creating a pod for AffinityAssistant: %v", err) + } + + // now, try to create affinity assistant again with an existing StatefulSet and unschedulable node + err = c.createAffinityAssistants(ctx, testPipelineRun.Spec.Workspaces, testPipelineRun, testPipelineRun.Namespace) + if err != nil { + t.Errorf("unexpected error from createAffinityAssistants: %v", err) + } + + // the affinity assistant pod must be deleted with an affinity assistant running on a cordoned node + _, err = c.KubeClientSet.CoreV1().Pods(testPipelineRun.Namespace).Get(ctx, expectedAffinityAssistantName+"-0", metav1.GetOptions{}) + if !apierrors.IsNotFound(err) { + t.Errorf("expected a NotFound response, got: %v", err) + } + + // now, create an affinity assistant pod with a missing node + // create an affinity-assistant pod with necessary labels + _, err = c.KubeClientSet.CoreV1().Pods(testPipelineRun.Namespace).Create(ctx, &corev1.Pod{ + ObjectMeta: p.ObjectMeta, + Spec: corev1.PodSpec{ + NodeName: "missing-node", + }, + }, metav1.CreateOptions{}) + if err != nil { + t.Errorf("unexpected error from creating a pod for AffinityAssistant: %v", err) + } + + // now, try to create affinity assistant again with an existing StatefulSet and missing node + err = c.createAffinityAssistants(ctx, testPipelineRun.Spec.Workspaces, testPipelineRun, testPipelineRun.Namespace) + if err == nil { + t.Error("expected error from createAffinityAssistants, but no error was reported") + } +} + func TestPipelineRunPodTemplatesArePropagatedToAffinityAssistant(t *testing.T) { prWithCustomPodTemplate := &v1beta1.PipelineRun{ TypeMeta: metav1.TypeMeta{Kind: "PipelineRun"}, diff --git a/pkg/reconciler/pipelinerun/pipelinerun.go b/pkg/reconciler/pipelinerun/pipelinerun.go index e2d103fe392..828732172e3 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun.go +++ b/pkg/reconciler/pipelinerun/pipelinerun.go @@ -609,16 +609,18 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun, get return controller.NewPermanentError(err) } } + } - if !c.isAffinityAssistantDisabled(ctx) { - // create Affinity Assistant (StatefulSet) so that taskRun pods that share workspace PVC achieve Node Affinity - if err = c.createAffinityAssistants(ctx, pr.Spec.Workspaces, pr, pr.Namespace); err != nil { - logger.Errorf("Failed to create affinity assistant StatefulSet for PipelineRun %s: %v", pr.Name, err) - pr.Status.MarkFailed(ReasonCouldntCreateAffinityAssistantStatefulSet, - "Failed to create StatefulSet for PipelineRun %s/%s correctly: %s", - pr.Namespace, pr.Name, err) - return controller.NewPermanentError(err) - } + // Make an attempt to create Affinity Assistant if it does not exist + // if the Affinity Assistant already exists, handle the possibility of assigned node going down by deleting the pod + if !c.isAffinityAssistantDisabled(ctx) { + // create Affinity Assistant (StatefulSet) so that taskRun pods that share workspace PVC achieve Node Affinity + if err = c.createAffinityAssistants(ctx, pr.Spec.Workspaces, pr, pr.Namespace); err != nil { + logger.Errorf("Failed to create affinity assistant StatefulSet for PipelineRun %s: %v", pr.Name, err) + pr.Status.MarkFailed(ReasonCouldntCreateAffinityAssistantStatefulSet, + "Failed to create StatefulSet for PipelineRun %s/%s correctly: %s", + pr.Namespace, pr.Name, err) + return controller.NewPermanentError(err) } }