From f19cef09d5b06f7a7728a9c398f472f990d9e56e Mon Sep 17 00:00:00 2001 From: pritidesai Date: Sat, 6 May 2023 22:36:32 -0700 Subject: [PATCH] update affinity assistant creation implementation Before this commit, the affinity assistant was created in the beginning of the pipleineRun. And the same affinity assistant was relied on for the entire lifecycle of a PR. Now, there could be a case when the node on which affinity assistant pod is created goes down. In this case, the rest of the pipelineRun is stuck and cannot run to the completition since the affinity assistant (StatefulSet) tries to schedule rest of the pods (taskRuns) on the same node but that node is cordoned or not scheduling anything new. This commit always makes an attempt to create Affinity Assistant (StatefulSet) in case it does not exist. If it exist, the controller checks if the node on which Affinity Assistant pod is created is healthy to schedule subsequent pods. If not, the controller deletes Affinity Assistant pod so that StatefulSet can upscale the replicas (set to 1) on other node in the cluster. Signed-off-by: Priti Desai --- config/200-clusterrole.yaml | 4 + .../pipelinerun/affinity_assistant.go | 32 ++++- .../pipelinerun/affinity_assistant_test.go | 125 ++++++++++++++++++ pkg/reconciler/pipelinerun/pipelinerun.go | 20 +-- 4 files changed, 170 insertions(+), 11 deletions(-) diff --git a/config/200-clusterrole.yaml b/config/200-clusterrole.yaml index e95592832ad..87ee01a04f2 100644 --- a/config/200-clusterrole.yaml +++ b/config/200-clusterrole.yaml @@ -25,6 +25,10 @@ rules: # Controller needs to watch Pods created by TaskRuns to see them progress. resources: ["pods"] verbs: ["list", "watch"] + - apiGroups: [""] + # Controller needs to watch nodes for their health over the course of a single run + resources: ["nodes"] + verbs: ["get", "list", "watch"] # Controller needs cluster access to all of the CRDs that it is responsible for # managing. - apiGroups: ["tekton.dev"] diff --git a/pkg/reconciler/pipelinerun/affinity_assistant.go b/pkg/reconciler/pipelinerun/affinity_assistant.go index edcb9c800b5..7bbc8a3f576 100644 --- a/pkg/reconciler/pipelinerun/affinity_assistant.go +++ b/pkg/reconciler/pipelinerun/affinity_assistant.go @@ -56,9 +56,10 @@ func (c *Reconciler) createAffinityAssistants(ctx context.Context, wb []v1beta1. for _, w := range wb { if w.PersistentVolumeClaim != nil || w.VolumeClaimTemplate != nil { affinityAssistantName := getAffinityAssistantName(w.Name, pr.Name) - _, err := c.KubeClientSet.AppsV1().StatefulSets(namespace).Get(ctx, affinityAssistantName, metav1.GetOptions{}) + a, err := c.KubeClientSet.AppsV1().StatefulSets(namespace).Get(ctx, affinityAssistantName, metav1.GetOptions{}) claimName := getClaimName(w, *kmeta.NewControllerRef(pr)) switch { + // check whether the affinity assistant (StatefulSet) exists or not, create one if it does not exist case apierrors.IsNotFound(err): affinityAssistantStatefulSet := affinityAssistantStatefulSet(affinityAssistantName, pr, claimName, c.Images.NopImage, cfg.Defaults.DefaultAAPodTemplate) _, err := c.KubeClientSet.AppsV1().StatefulSets(namespace).Create(ctx, affinityAssistantStatefulSet, metav1.CreateOptions{}) @@ -68,6 +69,33 @@ func (c *Reconciler) createAffinityAssistants(ctx context.Context, wb []v1beta1. if err == nil { logger.Infof("Created StatefulSet %s in namespace %s", affinityAssistantName, namespace) } + // check whether the affinity assistant (StatefulSet) exists and the affinity assistant pod is created + // this case addresses issues specified in https://github.com/tektoncd/pipeline/issues/6586 + case err == nil && a != nil && a.Status.ReadyReplicas == 1: + // get the list of pods associated with a given StatefulSet based on the following label: + // app.kubernetes.io/instance=affinity-assistant-641c8a0038 + pods, err := c.KubeClientSet.CoreV1().Pods("").List(ctx, metav1.ListOptions{ + LabelSelector: workspace.LabelInstance + "=" + a.Name, + }) + if err != nil || len(pods.Items) == 0 { + return fmt.Errorf("could not get the affinity assistant pod for StatefulSet %s: %w", a.Name, err) + } + // iterate over the list of pods - at most there can be only one pod as the requested replicas is set to 1 + for _, pod := range pods.Items { + // get the node based on the affinity assistant pod - pod.Spec.nodeName + n, err := c.KubeClientSet.CoreV1().Nodes().Get(ctx, pod.Spec.NodeName, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("could not get the node \"%s\" on which affinity assistant pod is created, err: %w", pod.Spec.NodeName, err) + } + // node which hosts the affinity assistant pod is unschedulable or cordoned for some reason + // if the node is unschedulable, delete the affinity assistant pod such that a StatefulSet can recreate the same pod on a different node + if n.Spec.Unschedulable { + err = c.KubeClientSet.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}) + if err != nil { + return fmt.Errorf("error deleting affinity assistant pod %s in ns %s: %w", pod.Name, pod.Namespace, err) + } + } + } case err != nil: errs = append(errs, fmt.Errorf("failed to retrieve StatefulSet %s: %w", affinityAssistantName, err)) } @@ -107,7 +135,7 @@ func (c *Reconciler) cleanupAffinityAssistants(ctx context.Context, pr *v1beta1. func getAffinityAssistantName(pipelineWorkspaceName string, pipelineRunName string) string { hashBytes := sha256.Sum256([]byte(pipelineWorkspaceName + pipelineRunName)) hashString := fmt.Sprintf("%x", hashBytes) - return fmt.Sprintf("%s-%s", "affinity-assistant", hashString[:10]) + return fmt.Sprintf("%s-%s", workspace.ComponentNameAffinityAssistant, hashString[:10]) } func getStatefulSetLabels(pr *v1beta1.PipelineRun, affinityAssistantName string) map[string]string { diff --git a/pkg/reconciler/pipelinerun/affinity_assistant_test.go b/pkg/reconciler/pipelinerun/affinity_assistant_test.go index ff7fb6fad98..883a41e77ae 100644 --- a/pkg/reconciler/pipelinerun/affinity_assistant_test.go +++ b/pkg/reconciler/pipelinerun/affinity_assistant_test.go @@ -18,6 +18,7 @@ package pipelinerun import ( "context" + "encoding/json" "testing" "github.com/google/go-cmp/cmp" @@ -28,9 +29,11 @@ import ( "github.com/tektoncd/pipeline/pkg/workspace" "github.com/tektoncd/pipeline/test/diff" "github.com/tektoncd/pipeline/test/parse" + "gomodules.xyz/jsonpatch/v2" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" fakek8s "k8s.io/client-go/kubernetes/fake" logtesting "knative.dev/pkg/logging/testing" "knative.dev/pkg/system" @@ -88,6 +91,128 @@ func TestCreateAndDeleteOfAffinityAssistant(t *testing.T) { } } +// TestCreateAffinityAssistantWhenNodeIsCordoned tests an existing Affinity Assistant can identify the node failure and +// can migrate the affinity assistant pod to a healthy node +func TestCreateAffinityAssistantWhenNodeIsCordoned(t *testing.T) { + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + c := Reconciler{ + KubeClientSet: fakek8s.NewSimpleClientset(), + Images: pipeline.Images{}, + } + + workspaceName := "testws" + pipelineRunName := "pipelinerun-1" + testPipelineRun := &v1beta1.PipelineRun{ + TypeMeta: metav1.TypeMeta{Kind: "PipelineRun"}, + ObjectMeta: metav1.ObjectMeta{ + Name: pipelineRunName, + }, + Spec: v1beta1.PipelineRunSpec{ + Workspaces: []v1beta1.WorkspaceBinding{{ + Name: workspaceName, + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: "myclaim", + }, + }}, + }, + } + + // create affinity assistant in a cluster + err := c.createAffinityAssistants(ctx, testPipelineRun.Spec.Workspaces, testPipelineRun, testPipelineRun.Namespace) + if err != nil { + t.Errorf("unexpected error from createAffinityAssistants: %v", err) + } + + // get the affinity assistant just created + expectedAffinityAssistantName := getAffinityAssistantName(workspaceName, testPipelineRun.Name) + + // assume the necessary pod is created, update the affinity assistant with the number of pods in readyReplicas + var replaceReplicasPatchBytes []byte + replaceReplicasPatchBytes, err = json.Marshal([]jsonpatch.JsonPatchOperation{{ + Operation: "replace", + Path: "/status/readyReplicas", + Value: int32(1), + }}) + _, err = c.KubeClientSet.AppsV1().StatefulSets(testPipelineRun.Namespace).Patch(ctx, expectedAffinityAssistantName, types.JSONPatchType, replaceReplicasPatchBytes, metav1.PatchOptions{}) + if err != nil { + t.Errorf("unexpected error updating StatefulSet: %v", err) + } + + // add a cordoned node to the list of nodes + c.KubeClientSet.CoreV1().Nodes().Create(ctx, &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "soon-to-be-cordoned-node", + }, + Spec: corev1.NodeSpec{ + Unschedulable: true, + }, + }, metav1.CreateOptions{}) + + // Test 1: createAffinityAssistants must return an error when the StatefulSet has the readyReplicas set to 1 without any pod with the necessary label created + err = c.createAffinityAssistants(ctx, testPipelineRun.Spec.Workspaces, testPipelineRun, testPipelineRun.Namespace) + if err == nil { + t.Error("expected error from createAffinityAssistants, but no error was reported") + } + + // now, define a pod with the affinity assistant labels and owner references + p := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: expectedAffinityAssistantName + "-0", + Labels: map[string]string{ + workspace.LabelComponent: workspace.ComponentNameAffinityAssistant, + workspace.LabelInstance: expectedAffinityAssistantName, + "statefulset.kubernetes.io/pod-name": expectedAffinityAssistantName + "-0", + }, + OwnerReferences: []metav1.OwnerReference{{ + APIVersion: "apps/v1", + Kind: "StatefulSet", + Name: expectedAffinityAssistantName, + }}, + }, + } + // create an affinity-assistant pod with necessary labels + _, err = c.KubeClientSet.CoreV1().Pods(testPipelineRun.Namespace).Create(ctx, &corev1.Pod{ + ObjectMeta: p.ObjectMeta, + Spec: corev1.PodSpec{ + NodeName: "soon-to-be-cordoned-node", + }, + }, metav1.CreateOptions{}) + if err != nil { + t.Errorf("unexpected error from creating a pod for AffinityAssistant: %v", err) + } + + // Test 2: createAffinityAssistants must delete an affinity assistant pod since the node on which its scheduled is marked as unschedulable + err = c.createAffinityAssistants(ctx, testPipelineRun.Spec.Workspaces, testPipelineRun, testPipelineRun.Namespace) + if err != nil { + t.Errorf("unexpected error from createAffinityAssistants: %v", err) + } + // validation of Test 2, the affinity assistant pod must be deleted with an affinity assistant running on a cordoned node + _, err = c.KubeClientSet.CoreV1().Pods(testPipelineRun.Namespace).Get(ctx, expectedAffinityAssistantName+"-0", metav1.GetOptions{}) + if !apierrors.IsNotFound(err) { + t.Errorf("expected a NotFound response, got: %v", err) + } + + // Test 3: createAffinityAssistants must return an error when the affinity assistant pod has a nodeName which does not exist + // create an affinity-assistant pod with necessary labels + _, err = c.KubeClientSet.CoreV1().Pods(testPipelineRun.Namespace).Create(ctx, &corev1.Pod{ + ObjectMeta: p.ObjectMeta, + Spec: corev1.PodSpec{ + NodeName: "missing-node", + }, + }, metav1.CreateOptions{}) + if err != nil { + t.Errorf("unexpected error from creating a pod for AffinityAssistant: %v", err) + } + // validation of Test 3, try to create affinity assistant again with an existing StatefulSet, affinity assistant pod, and missing node + err = c.createAffinityAssistants(ctx, testPipelineRun.Spec.Workspaces, testPipelineRun, testPipelineRun.Namespace) + if err == nil { + t.Error("expected error from createAffinityAssistants, but no error was reported") + } +} + func TestPipelineRunPodTemplatesArePropagatedToAffinityAssistant(t *testing.T) { prWithCustomPodTemplate := &v1beta1.PipelineRun{ TypeMeta: metav1.TypeMeta{Kind: "PipelineRun"}, diff --git a/pkg/reconciler/pipelinerun/pipelinerun.go b/pkg/reconciler/pipelinerun/pipelinerun.go index e2d103fe392..5a2c537d06b 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun.go +++ b/pkg/reconciler/pipelinerun/pipelinerun.go @@ -609,16 +609,18 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun, get return controller.NewPermanentError(err) } } + } - if !c.isAffinityAssistantDisabled(ctx) { - // create Affinity Assistant (StatefulSet) so that taskRun pods that share workspace PVC achieve Node Affinity - if err = c.createAffinityAssistants(ctx, pr.Spec.Workspaces, pr, pr.Namespace); err != nil { - logger.Errorf("Failed to create affinity assistant StatefulSet for PipelineRun %s: %v", pr.Name, err) - pr.Status.MarkFailed(ReasonCouldntCreateAffinityAssistantStatefulSet, - "Failed to create StatefulSet for PipelineRun %s/%s correctly: %s", - pr.Namespace, pr.Name, err) - return controller.NewPermanentError(err) - } + // Make an attempt to create Affinity Assistant if it does not exist + // if the Affinity Assistant already exists, handle the possibility of assigned node becoming unschedulable by deleting the pod + if !c.isAffinityAssistantDisabled(ctx) { + // create Affinity Assistant (StatefulSet) so that taskRun pods that share workspace PVC achieve Node Affinity + if err = c.createAffinityAssistants(ctx, pr.Spec.Workspaces, pr, pr.Namespace); err != nil { + logger.Errorf("Failed to create affinity assistant StatefulSet for PipelineRun %s: %v", pr.Name, err) + pr.Status.MarkFailed(ReasonCouldntCreateAffinityAssistantStatefulSet, + "Failed to create StatefulSet for PipelineRun %s/%s correctly: %s", + pr.Namespace, pr.Name, err) + return controller.NewPermanentError(err) } }