diff --git a/config/200-clusterrole.yaml b/config/200-clusterrole.yaml index e95592832ad..87ee01a04f2 100644 --- a/config/200-clusterrole.yaml +++ b/config/200-clusterrole.yaml @@ -25,6 +25,10 @@ rules: # Controller needs to watch Pods created by TaskRuns to see them progress. resources: ["pods"] verbs: ["list", "watch"] + - apiGroups: [""] + # Controller needs to watch nodes for their health over the course of a single run + resources: ["nodes"] + verbs: ["get", "list", "watch"] # Controller needs cluster access to all of the CRDs that it is responsible for # managing. - apiGroups: ["tekton.dev"] diff --git a/pkg/reconciler/pipelinerun/affinity_assistant.go b/pkg/reconciler/pipelinerun/affinity_assistant.go index edcb9c800b5..7bbc8a3f576 100644 --- a/pkg/reconciler/pipelinerun/affinity_assistant.go +++ b/pkg/reconciler/pipelinerun/affinity_assistant.go @@ -56,9 +56,10 @@ func (c *Reconciler) createAffinityAssistants(ctx context.Context, wb []v1beta1. for _, w := range wb { if w.PersistentVolumeClaim != nil || w.VolumeClaimTemplate != nil { affinityAssistantName := getAffinityAssistantName(w.Name, pr.Name) - _, err := c.KubeClientSet.AppsV1().StatefulSets(namespace).Get(ctx, affinityAssistantName, metav1.GetOptions{}) + a, err := c.KubeClientSet.AppsV1().StatefulSets(namespace).Get(ctx, affinityAssistantName, metav1.GetOptions{}) claimName := getClaimName(w, *kmeta.NewControllerRef(pr)) switch { + // check whether the affinity assistant (StatefulSet) exists or not, create one if it does not exist case apierrors.IsNotFound(err): affinityAssistantStatefulSet := affinityAssistantStatefulSet(affinityAssistantName, pr, claimName, c.Images.NopImage, cfg.Defaults.DefaultAAPodTemplate) _, err := c.KubeClientSet.AppsV1().StatefulSets(namespace).Create(ctx, affinityAssistantStatefulSet, metav1.CreateOptions{}) @@ -68,6 +69,33 @@ func (c *Reconciler) createAffinityAssistants(ctx context.Context, wb []v1beta1. if err == nil { logger.Infof("Created StatefulSet %s in namespace %s", affinityAssistantName, namespace) } + // check whether the affinity assistant (StatefulSet) exists and the affinity assistant pod is created + // this case addresses issues specified in https://github.com/tektoncd/pipeline/issues/6586 + case err == nil && a != nil && a.Status.ReadyReplicas == 1: + // get the list of pods associated with a given StatefulSet based on the following label: + // app.kubernetes.io/instance=affinity-assistant-641c8a0038 + pods, err := c.KubeClientSet.CoreV1().Pods("").List(ctx, metav1.ListOptions{ + LabelSelector: workspace.LabelInstance + "=" + a.Name, + }) + if err != nil || len(pods.Items) == 0 { + return fmt.Errorf("could not get the affinity assistant pod for StatefulSet %s: %w", a.Name, err) + } + // iterate over the list of pods - at most there can be only one pod as the requested replicas is set to 1 + for _, pod := range pods.Items { + // get the node based on the affinity assistant pod - pod.Spec.nodeName + n, err := c.KubeClientSet.CoreV1().Nodes().Get(ctx, pod.Spec.NodeName, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("could not get the node \"%s\" on which affinity assistant pod is created, err: %w", pod.Spec.NodeName, err) + } + // node which hosts the affinity assistant pod is unschedulable or cordoned for some reason + // if the node is unschedulable, delete the affinity assistant pod such that a StatefulSet can recreate the same pod on a different node + if n.Spec.Unschedulable { + err = c.KubeClientSet.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}) + if err != nil { + return fmt.Errorf("error deleting affinity assistant pod %s in ns %s: %w", pod.Name, pod.Namespace, err) + } + } + } case err != nil: errs = append(errs, fmt.Errorf("failed to retrieve StatefulSet %s: %w", affinityAssistantName, err)) } @@ -107,7 +135,7 @@ func (c *Reconciler) cleanupAffinityAssistants(ctx context.Context, pr *v1beta1. func getAffinityAssistantName(pipelineWorkspaceName string, pipelineRunName string) string { hashBytes := sha256.Sum256([]byte(pipelineWorkspaceName + pipelineRunName)) hashString := fmt.Sprintf("%x", hashBytes) - return fmt.Sprintf("%s-%s", "affinity-assistant", hashString[:10]) + return fmt.Sprintf("%s-%s", workspace.ComponentNameAffinityAssistant, hashString[:10]) } func getStatefulSetLabels(pr *v1beta1.PipelineRun, affinityAssistantName string) map[string]string { diff --git a/pkg/reconciler/pipelinerun/affinity_assistant_test.go b/pkg/reconciler/pipelinerun/affinity_assistant_test.go index ff7fb6fad98..883a41e77ae 100644 --- a/pkg/reconciler/pipelinerun/affinity_assistant_test.go +++ b/pkg/reconciler/pipelinerun/affinity_assistant_test.go @@ -18,6 +18,7 @@ package pipelinerun import ( "context" + "encoding/json" "testing" "github.com/google/go-cmp/cmp" @@ -28,9 +29,11 @@ import ( "github.com/tektoncd/pipeline/pkg/workspace" "github.com/tektoncd/pipeline/test/diff" "github.com/tektoncd/pipeline/test/parse" + "gomodules.xyz/jsonpatch/v2" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" fakek8s "k8s.io/client-go/kubernetes/fake" logtesting "knative.dev/pkg/logging/testing" "knative.dev/pkg/system" @@ -88,6 +91,128 @@ func TestCreateAndDeleteOfAffinityAssistant(t *testing.T) { } } +// TestCreateAffinityAssistantWhenNodeIsCordoned tests an existing Affinity Assistant can identify the node failure and +// can migrate the affinity assistant pod to a healthy node +func TestCreateAffinityAssistantWhenNodeIsCordoned(t *testing.T) { + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + c := Reconciler{ + KubeClientSet: fakek8s.NewSimpleClientset(), + Images: pipeline.Images{}, + } + + workspaceName := "testws" + pipelineRunName := "pipelinerun-1" + testPipelineRun := &v1beta1.PipelineRun{ + TypeMeta: metav1.TypeMeta{Kind: "PipelineRun"}, + ObjectMeta: metav1.ObjectMeta{ + Name: pipelineRunName, + }, + Spec: v1beta1.PipelineRunSpec{ + Workspaces: []v1beta1.WorkspaceBinding{{ + Name: workspaceName, + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: "myclaim", + }, + }}, + }, + } + + // create affinity assistant in a cluster + err := c.createAffinityAssistants(ctx, testPipelineRun.Spec.Workspaces, testPipelineRun, testPipelineRun.Namespace) + if err != nil { + t.Errorf("unexpected error from createAffinityAssistants: %v", err) + } + + // get the affinity assistant just created + expectedAffinityAssistantName := getAffinityAssistantName(workspaceName, testPipelineRun.Name) + + // assume the necessary pod is created, update the affinity assistant with the number of pods in readyReplicas + var replaceReplicasPatchBytes []byte + replaceReplicasPatchBytes, err = json.Marshal([]jsonpatch.JsonPatchOperation{{ + Operation: "replace", + Path: "/status/readyReplicas", + Value: int32(1), + }}) + _, err = c.KubeClientSet.AppsV1().StatefulSets(testPipelineRun.Namespace).Patch(ctx, expectedAffinityAssistantName, types.JSONPatchType, replaceReplicasPatchBytes, metav1.PatchOptions{}) + if err != nil { + t.Errorf("unexpected error updating StatefulSet: %v", err) + } + + // add a cordoned node to the list of nodes + c.KubeClientSet.CoreV1().Nodes().Create(ctx, &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "soon-to-be-cordoned-node", + }, + Spec: corev1.NodeSpec{ + Unschedulable: true, + }, + }, metav1.CreateOptions{}) + + // Test 1: createAffinityAssistants must return an error when the StatefulSet has the readyReplicas set to 1 without any pod with the necessary label created + err = c.createAffinityAssistants(ctx, testPipelineRun.Spec.Workspaces, testPipelineRun, testPipelineRun.Namespace) + if err == nil { + t.Error("expected error from createAffinityAssistants, but no error was reported") + } + + // now, define a pod with the affinity assistant labels and owner references + p := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: expectedAffinityAssistantName + "-0", + Labels: map[string]string{ + workspace.LabelComponent: workspace.ComponentNameAffinityAssistant, + workspace.LabelInstance: expectedAffinityAssistantName, + "statefulset.kubernetes.io/pod-name": expectedAffinityAssistantName + "-0", + }, + OwnerReferences: []metav1.OwnerReference{{ + APIVersion: "apps/v1", + Kind: "StatefulSet", + Name: expectedAffinityAssistantName, + }}, + }, + } + // create an affinity-assistant pod with necessary labels + _, err = c.KubeClientSet.CoreV1().Pods(testPipelineRun.Namespace).Create(ctx, &corev1.Pod{ + ObjectMeta: p.ObjectMeta, + Spec: corev1.PodSpec{ + NodeName: "soon-to-be-cordoned-node", + }, + }, metav1.CreateOptions{}) + if err != nil { + t.Errorf("unexpected error from creating a pod for AffinityAssistant: %v", err) + } + + // Test 2: createAffinityAssistants must delete an affinity assistant pod since the node on which its scheduled is marked as unschedulable + err = c.createAffinityAssistants(ctx, testPipelineRun.Spec.Workspaces, testPipelineRun, testPipelineRun.Namespace) + if err != nil { + t.Errorf("unexpected error from createAffinityAssistants: %v", err) + } + // validation of Test 2, the affinity assistant pod must be deleted with an affinity assistant running on a cordoned node + _, err = c.KubeClientSet.CoreV1().Pods(testPipelineRun.Namespace).Get(ctx, expectedAffinityAssistantName+"-0", metav1.GetOptions{}) + if !apierrors.IsNotFound(err) { + t.Errorf("expected a NotFound response, got: %v", err) + } + + // Test 3: createAffinityAssistants must return an error when the affinity assistant pod has a nodeName which does not exist + // create an affinity-assistant pod with necessary labels + _, err = c.KubeClientSet.CoreV1().Pods(testPipelineRun.Namespace).Create(ctx, &corev1.Pod{ + ObjectMeta: p.ObjectMeta, + Spec: corev1.PodSpec{ + NodeName: "missing-node", + }, + }, metav1.CreateOptions{}) + if err != nil { + t.Errorf("unexpected error from creating a pod for AffinityAssistant: %v", err) + } + // validation of Test 3, try to create affinity assistant again with an existing StatefulSet, affinity assistant pod, and missing node + err = c.createAffinityAssistants(ctx, testPipelineRun.Spec.Workspaces, testPipelineRun, testPipelineRun.Namespace) + if err == nil { + t.Error("expected error from createAffinityAssistants, but no error was reported") + } +} + func TestPipelineRunPodTemplatesArePropagatedToAffinityAssistant(t *testing.T) { prWithCustomPodTemplate := &v1beta1.PipelineRun{ TypeMeta: metav1.TypeMeta{Kind: "PipelineRun"}, diff --git a/pkg/reconciler/pipelinerun/pipelinerun.go b/pkg/reconciler/pipelinerun/pipelinerun.go index e2d103fe392..5a2c537d06b 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun.go +++ b/pkg/reconciler/pipelinerun/pipelinerun.go @@ -609,16 +609,18 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun, get return controller.NewPermanentError(err) } } + } - if !c.isAffinityAssistantDisabled(ctx) { - // create Affinity Assistant (StatefulSet) so that taskRun pods that share workspace PVC achieve Node Affinity - if err = c.createAffinityAssistants(ctx, pr.Spec.Workspaces, pr, pr.Namespace); err != nil { - logger.Errorf("Failed to create affinity assistant StatefulSet for PipelineRun %s: %v", pr.Name, err) - pr.Status.MarkFailed(ReasonCouldntCreateAffinityAssistantStatefulSet, - "Failed to create StatefulSet for PipelineRun %s/%s correctly: %s", - pr.Namespace, pr.Name, err) - return controller.NewPermanentError(err) - } + // Make an attempt to create Affinity Assistant if it does not exist + // if the Affinity Assistant already exists, handle the possibility of assigned node becoming unschedulable by deleting the pod + if !c.isAffinityAssistantDisabled(ctx) { + // create Affinity Assistant (StatefulSet) so that taskRun pods that share workspace PVC achieve Node Affinity + if err = c.createAffinityAssistants(ctx, pr.Spec.Workspaces, pr, pr.Namespace); err != nil { + logger.Errorf("Failed to create affinity assistant StatefulSet for PipelineRun %s: %v", pr.Name, err) + pr.Status.MarkFailed(ReasonCouldntCreateAffinityAssistantStatefulSet, + "Failed to create StatefulSet for PipelineRun %s/%s correctly: %s", + pr.Namespace, pr.Name, err) + return controller.NewPermanentError(err) } }