Skip to content

Commit

Permalink
update affinity assistant creation implementation
Browse files Browse the repository at this point in the history
Before this commit, the affinity assistant was created in the beginning of
the pipleineRun. And the same affinity assistant was relied on for the
entire lifecycle of a PR. Now, there could be a case when the node
on which affinity assistant pod is created goes down. In this case,
the rest of the pipelineRun is stuck and cannot run to the completition
since the affinity assistant (StatefulSet) tries to schedule rest
of the pods (taskRuns) on the same node but that node is cordoned or not
scheduling anything new.

This commit always makes an attempt to create Affinity Assistant (StatefulSet)
in case it does not exist. If it exist, the controller checks if the node on
which Affinity Assistant pod is created is healthy to schedule subsequent
pods. If not, the controller deletes Affinity Assistant pod so that
StatefulSet can upscale the replicas (set to 1) on other node in the cluster.
  • Loading branch information
Priti Desai committed Apr 28, 2023
1 parent 626669e commit e828393
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 10 deletions.
33 changes: 32 additions & 1 deletion pkg/reconciler/pipelinerun/affinity_assistant.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (c *Reconciler) createAffinityAssistants(ctx context.Context, wb []v1beta1.
for _, w := range wb {
if w.PersistentVolumeClaim != nil || w.VolumeClaimTemplate != nil {
affinityAssistantName := getAffinityAssistantName(w.Name, pr.Name)
_, err := c.KubeClientSet.AppsV1().StatefulSets(namespace).Get(ctx, affinityAssistantName, metav1.GetOptions{})
a, err := c.KubeClientSet.AppsV1().StatefulSets(namespace).Get(ctx, affinityAssistantName, metav1.GetOptions{})
claimName := getClaimName(w, *kmeta.NewControllerRef(pr))
switch {
case apierrors.IsNotFound(err):
Expand All @@ -68,6 +68,37 @@ func (c *Reconciler) createAffinityAssistants(ctx context.Context, wb []v1beta1.
if err == nil {
logger.Infof("Created StatefulSet %s in namespace %s", affinityAssistantName, namespace)
}
case err == nil && a != nil:
// the node which hosts the affinity assistant pod is unschedulable or cordoned for some reason,
// delete the affinity assistant pod such that a StatefulSet can recreate the same pod on a different node
// get the list of pods associated with a given StatefulSet based on the following label:
// app.kubernetes.io/instance=affinity-assistant-641c8a0038
pods, err := c.KubeClientSet.CoreV1().Pods("").List(ctx, metav1.ListOptions{
LabelSelector: workspace.LabelInstance + "=" + a.Name,
})
if err != nil {
return fmt.Errorf("could not list affinity assistant pods for StatefulSet %s: %s", a.Name, err)
}
// there should be only one pod created for a given StatefulSet since we have set the replicas to 1
if len(pods.Items) > 1 {
return fmt.Errorf("the affinity assistant only creates one replica but we found \"%d\" pods", len(pods.Items))
}
// iterate over the list of pods
// at most there can be only one pod but could be zero due to some network delay
for _, pod := range pods.Items {
// get the node based on the affinity assistant pod - pod.Spec.nodeName
n, err := c.KubeClientSet.CoreV1().Nodes().Get(ctx, pod.Spec.NodeName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("could not get the node \"%s\" on which affinity assistant pod is created", pod.Spec.NodeName)
}
// delete the affinity assistant pod if the node is cordoned or unschedulable anymore
if n.Spec.Unschedulable {
err = c.KubeClientSet.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("error deleting affinity assistant pod %s in ns %s: %s", pod.Name, pod.Namespace, err)
}
}
}
case err != nil:
errs = append(errs, fmt.Errorf("failed to retrieve StatefulSet %s: %w", affinityAssistantName, err))
}
Expand Down
20 changes: 11 additions & 9 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,16 +607,18 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun, get
return controller.NewPermanentError(err)
}
}
}

if !c.isAffinityAssistantDisabled(ctx) {
// create Affinity Assistant (StatefulSet) so that taskRun pods that share workspace PVC achieve Node Affinity
if err = c.createAffinityAssistants(ctx, pr.Spec.Workspaces, pr, pr.Namespace); err != nil {
logger.Errorf("Failed to create affinity assistant StatefulSet for PipelineRun %s: %v", pr.Name, err)
pr.Status.MarkFailed(ReasonCouldntCreateAffinityAssistantStatefulSet,
"Failed to create StatefulSet for PipelineRun %s/%s correctly: %s",
pr.Namespace, pr.Name, err)
return controller.NewPermanentError(err)
}
// Make an attempt to create Affinity Assistant if it does not exist
// if the Affinity Assistant already exists, handle the possibility of assigned node going down by deleting the pod
if !c.isAffinityAssistantDisabled(ctx) {
// create Affinity Assistant (StatefulSet) so that taskRun pods that share workspace PVC achieve Node Affinity
if err = c.createAffinityAssistants(ctx, pr.Spec.Workspaces, pr, pr.Namespace); err != nil {
logger.Errorf("Failed to create affinity assistant StatefulSet for PipelineRun %s: %v", pr.Name, err)
pr.Status.MarkFailed(ReasonCouldntCreateAffinityAssistantStatefulSet,
"Failed to create StatefulSet for PipelineRun %s/%s correctly: %s",
pr.Namespace, pr.Name, err)
return controller.NewPermanentError(err)
}
}

Expand Down

0 comments on commit e828393

Please sign in to comment.