From cf499efb6732af3dbb4c75c162a19b1f1705c0ba Mon Sep 17 00:00:00 2001 From: Chmouel Boudjnah Date: Fri, 25 Oct 2024 13:37:08 +0200 Subject: [PATCH] Make sure we collect the right prun in the Queue When initializing the queues, we need to make sure we only collect the pruns that we want to for pending and running queues. This filter out as well the non existent ones, in case if it was deleted but we have the old one referencing it in its state. Signed-off-by: Chmouel Boudjnah --- pkg/reconciler/queue_pipelineruns.go | 4 +- pkg/sync/queue_manager.go | 35 ++++++++- pkg/sync/queue_manager_test.go | 113 ++++++++++++++++++++++----- 3 files changed, 127 insertions(+), 25 deletions(-) diff --git a/pkg/reconciler/queue_pipelineruns.go b/pkg/reconciler/queue_pipelineruns.go index 7805329f3..2ab129f17 100644 --- a/pkg/reconciler/queue_pipelineruns.go +++ b/pkg/reconciler/queue_pipelineruns.go @@ -7,6 +7,8 @@ import ( "github.com/openshift-pipelines/pipelines-as-code/pkg/apis/pipelinesascode/keys" "github.com/openshift-pipelines/pipelines-as-code/pkg/apis/pipelinesascode/v1alpha1" + "github.com/openshift-pipelines/pipelines-as-code/pkg/kubeinteraction" + "github.com/openshift-pipelines/pipelines-as-code/pkg/sync" tektonv1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1" "go.uber.org/zap" "k8s.io/apimachinery/pkg/api/errors" @@ -50,7 +52,7 @@ func (r *Reconciler) queuePipelineRun(ctx context.Context, logger *zap.SugaredLo return nil } - orderedList := strings.Split(order, ",") + orderedList := sync.FilterPipelineRunByState(ctx, r.run.Clients.Tekton, strings.Split(order, ","), tektonv1.PipelineRunSpecStatusPending, kubeinteraction.StateQueued) acquired, err := r.qm.AddListToRunningQueue(repo, orderedList) if err != nil { return fmt.Errorf("failed to add to queue: %s: %w", pr.GetName(), err) diff --git a/pkg/sync/queue_manager.go b/pkg/sync/queue_manager.go index a504afa1b..6710c6326 100644 --- a/pkg/sync/queue_manager.go +++ b/pkg/sync/queue_manager.go @@ -160,6 +160,35 @@ func getQueueKey(run *tektonv1.PipelineRun) string { return fmt.Sprintf("%s/%s", run.Namespace, run.Name) } +// FilterPipelineRunByState filters the given list of PipelineRun names to only include those +// that are in a "queued" state and have a pending status. It retrieves the PipelineRun objects +// from the Tekton API and checks their annotations and status to determine if they should be included. +// +// Returns A list of PipelineRun names that are in a "queued" state and have a pending status. +func FilterPipelineRunByState(ctx context.Context, tekton versioned2.Interface, orderList []string, wantedStatus, wantedState string) []string { + orderedList := []string{} + for _, prName := range orderList { + prKey := strings.Split(prName, "/") + pr, err := tekton.TektonV1().PipelineRuns(prKey[0]).Get(ctx, prKey[1], v1.GetOptions{}) + if err != nil { + continue + } + + state, exist := pr.GetAnnotations()[keys.State] + if !exist { + continue + } + + if state == wantedState { + if wantedStatus != "" && pr.Spec.Status != tektonv1.PipelineRunSpecStatus(wantedStatus) { + continue + } + orderedList = append(orderedList, prName) + } + } + return orderedList +} + // InitQueues rebuild all the queues for all repository if concurrency is defined before // reconciler started reconciling them. func (qm *QueueManager) InitQueues(ctx context.Context, tekton versioned2.Interface, pac versioned.Interface) error { @@ -194,7 +223,8 @@ func (qm *QueueManager) InitQueues(ctx context.Context, tekton versioned2.Interf // if the pipelineRun doesn't have order label then wait return nil } - orderedList := strings.Split(order, ",") + orderedList := FilterPipelineRunByState(ctx, tekton, strings.Split(order, ","), "", kubeinteraction.StateStarted) + _, err = qm.AddListToRunningQueue(&repo, orderedList) if err != nil { qm.logger.Error("failed to init queue for repo: ", repo.GetName()) @@ -219,8 +249,7 @@ func (qm *QueueManager) InitQueues(ctx context.Context, tekton versioned2.Interf // if the pipelineRun doesn't have order label then wait return nil } - orderedList := strings.Split(order, ",") - + orderedList := FilterPipelineRunByState(ctx, tekton, strings.Split(order, ","), tektonv1.PipelineRunSpecStatusPending, kubeinteraction.StateQueued) if err := qm.AddToPendingQueue(&repo, orderedList); err != nil { qm.logger.Error("failed to init queue for repo: ", repo.GetName()) } diff --git a/pkg/sync/queue_manager_test.go b/pkg/sync/queue_manager_test.go index c02a343bd..33c621d6a 100644 --- a/pkg/sync/queue_manager_test.go +++ b/pkg/sync/queue_manager_test.go @@ -1,6 +1,7 @@ package sync import ( + "fmt" "testing" "time" @@ -16,6 +17,7 @@ import ( "go.uber.org/zap" zapobserver "go.uber.org/zap/zaptest/observer" "gotest.tools/v3/assert" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" duckv1 "knative.dev/pkg/apis/duck/v1" rtesting "knative.dev/pkg/reconciler/testing" @@ -30,7 +32,7 @@ func TestSomeoneElseSetPendingWithNoConcurrencyLimit(t *testing.T) { // unset concurrency limit repo.Spec.ConcurrencyLimit = nil - pr := newTestPR("first", time.Now(), nil, nil) + pr := newTestPR("first", time.Now(), nil, nil, tektonv1.PipelineRunSpec{}) // set to pending pr.Status.Conditions = duckv1.Conditions{ { @@ -52,7 +54,7 @@ func TestAddToPendingQueueDirectly(t *testing.T) { // unset concurrency limit repo.Spec.ConcurrencyLimit = nil - pr := newTestPR("first", time.Now(), nil, nil) + pr := newTestPR("first", time.Now(), nil, nil, tektonv1.PipelineRunSpec{}) // set to pending pr.Status.Conditions = duckv1.Conditions{ { @@ -77,7 +79,7 @@ func TestNewQueueManagerForList(t *testing.T) { repo := newTestRepo(1) // first pipelineRun - prFirst := newTestPR("first", time.Now(), nil, nil) + prFirst := newTestPR("first", time.Now(), nil, nil, tektonv1.PipelineRunSpec{}) // added to queue, as there is only one should start started, err := qm.AddListToRunningQueue(repo, []string{getQueueKey(prFirst)}) @@ -89,8 +91,8 @@ func TestNewQueueManagerForList(t *testing.T) { // adding another 2 pipelineRun, limit is 1 so this will be added to pending queue and // then one will be started - prSecond := newTestPR("second", time.Now().Add(1*time.Second), nil, nil) - prThird := newTestPR("third", time.Now().Add(7*time.Second), nil, nil) + prSecond := newTestPR("second", time.Now().Add(1*time.Second), nil, nil, tektonv1.PipelineRunSpec{}) + prThird := newTestPR("third", time.Now().Add(7*time.Second), nil, nil, tektonv1.PipelineRunSpec{}) started, err = qm.AddListToRunningQueue(repo, []string{getQueueKey(prSecond), getQueueKey(prThird)}) assert.NilError(t, err) @@ -99,8 +101,8 @@ func TestNewQueueManagerForList(t *testing.T) { assert.Equal(t, started[0], getQueueKey(prSecond)) // adding 2 more, will be going to pending queue - prFourth := newTestPR("fourth", time.Now().Add(5*time.Second), nil, nil) - prFifth := newTestPR("fifth", time.Now().Add(4*time.Second), nil, nil) + prFourth := newTestPR("fourth", time.Now().Add(5*time.Second), nil, nil, tektonv1.PipelineRunSpec{}) + prFifth := newTestPR("fifth", time.Now().Add(4*time.Second), nil, nil, tektonv1.PipelineRunSpec{}) started, err = qm.AddListToRunningQueue(repo, []string{getQueueKey(prFourth), getQueueKey(prFifth)}) assert.NilError(t, err) @@ -112,9 +114,9 @@ func TestNewQueueManagerForList(t *testing.T) { // changing the concurrency limit to 2 repo.Spec.ConcurrencyLimit = intPtr(2) - prSixth := newTestPR("sixth", time.Now().Add(7*time.Second), nil, nil) - prSeventh := newTestPR("seventh", time.Now().Add(5*time.Second), nil, nil) - prEight := newTestPR("eight", time.Now().Add(4*time.Second), nil, nil) + prSixth := newTestPR("sixth", time.Now().Add(7*time.Second), nil, nil, tektonv1.PipelineRunSpec{}) + prSeventh := newTestPR("seventh", time.Now().Add(5*time.Second), nil, nil, tektonv1.PipelineRunSpec{}) + prEight := newTestPR("eight", time.Now().Add(4*time.Second), nil, nil, tektonv1.PipelineRunSpec{}) started, err = qm.AddListToRunningQueue(repo, []string{getQueueKey(prSixth), getQueueKey(prSeventh), getQueueKey(prEight)}) assert.NilError(t, err) @@ -132,9 +134,9 @@ func TestNewQueueManagerReListing(t *testing.T) { // repository for which pipelineRun are created repo := newTestRepo(2) - prFirst := newTestPR("first", time.Now(), nil, nil) - prSecond := newTestPR("second", time.Now().Add(1*time.Second), nil, nil) - prThird := newTestPR("third", time.Now().Add(7*time.Second), nil, nil) + prFirst := newTestPR("first", time.Now(), nil, nil, tektonv1.PipelineRunSpec{}) + prSecond := newTestPR("second", time.Now().Add(1*time.Second), nil, nil, tektonv1.PipelineRunSpec{}) + prThird := newTestPR("third", time.Now().Add(7*time.Second), nil, nil, tektonv1.PipelineRunSpec{}) // added to queue, as there is only one should start started, err := qm.AddListToRunningQueue(repo, []string{getQueueKey(prFirst), getQueueKey(prSecond), getQueueKey(prThird)}) @@ -158,9 +160,9 @@ func TestNewQueueManagerReListing(t *testing.T) { assert.Equal(t, qm.QueuedPipelineRuns(repo)[0], "test-ns/third") // a new request comes - prFourth := newTestPR("fourth", time.Now(), nil, nil) - prFifth := newTestPR("fifth", time.Now().Add(1*time.Second), nil, nil) - prSixths := newTestPR("sixth", time.Now().Add(7*time.Second), nil, nil) + prFourth := newTestPR("fourth", time.Now(), nil, nil, tektonv1.PipelineRunSpec{}) + prFifth := newTestPR("fifth", time.Now().Add(1*time.Second), nil, nil, tektonv1.PipelineRunSpec{}) + prSixths := newTestPR("sixth", time.Now().Add(7*time.Second), nil, nil, tektonv1.PipelineRunSpec{}) started, err = qm.AddListToRunningQueue(repo, []string{getQueueKey(prFourth), getQueueKey(prFifth), getQueueKey(prSixths)}) assert.NilError(t, err) @@ -184,7 +186,7 @@ func newTestRepo(limit int) *v1alpha1.Repository { var intPtr = func(val int) *int { return &val } -func newTestPR(name string, time time.Time, labels, annotations map[string]string) *tektonv1.PipelineRun { +func newTestPR(name string, time time.Time, labels, annotations map[string]string, spec tektonv1.PipelineRunSpec) *tektonv1.PipelineRun { return &tektonv1.PipelineRun{ TypeMeta: metav1.TypeMeta{}, ObjectMeta: metav1.ObjectMeta{ @@ -194,7 +196,7 @@ func newTestPR(name string, time time.Time, labels, annotations map[string]strin Labels: labels, Annotations: annotations, }, - Spec: tektonv1.PipelineRunSpec{}, + Spec: spec, Status: tektonv1.PipelineRunStatus{}, } } @@ -222,9 +224,13 @@ func TestQueueManager_InitQueues(t *testing.T) { keys.ExecutionOrder: "test-ns/first,test-ns/second,test-ns/third", keys.State: kubeinteraction.StateStarted, } - firstPR := newTestPR("first", cw.Now(), startedLabel, startedAnnotations) - secondPR := newTestPR("second", cw.Now().Add(5*time.Second), queuedLabel, queuedAnnotations) - thirdPR := newTestPR("third", cw.Now().Add(3*time.Second), queuedLabel, queuedAnnotations) + firstPR := newTestPR("first", cw.Now(), startedLabel, startedAnnotations, tektonv1.PipelineRunSpec{}) + secondPR := newTestPR("second", cw.Now().Add(5*time.Second), queuedLabel, queuedAnnotations, tektonv1.PipelineRunSpec{ + Status: tektonv1.PipelineRunSpecStatusPending, + }) + thirdPR := newTestPR("third", cw.Now().Add(3*time.Second), queuedLabel, queuedAnnotations, tektonv1.PipelineRunSpec{ + Status: tektonv1.PipelineRunSpecStatusPending, + }) tdata := testclient.Data{ Repositories: []*v1alpha1.Repository{repo}, @@ -255,3 +261,68 @@ func TestQueueManager_InitQueues(t *testing.T) { runs = qm.QueuedPipelineRuns(repo) assert.Equal(t, len(runs), 1) } + +func TestFilterPipelineRunByInProgress(t *testing.T) { + ctx, _ := rtesting.SetupFakeContext(t) + ns := "test-ns" + + // Create a fake Tekton client + pipelineRuns := []*tektonv1.PipelineRun{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pr1", + Namespace: ns, + Annotations: map[string]string{ + keys.State: kubeinteraction.StateQueued, + }, + }, + Spec: tektonv1.PipelineRunSpec{ + Status: tektonv1.PipelineRunSpecStatusPending, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pr2", + Namespace: ns, + Annotations: map[string]string{ + keys.State: kubeinteraction.StateCompleted, + }, + }, + Spec: tektonv1.PipelineRunSpec{ + Status: tektonv1.PipelineRunSpecStatusPending, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pr3", + Namespace: ns, + Annotations: map[string]string{ + keys.State: kubeinteraction.StateQueued, + }, + }, + Spec: tektonv1.PipelineRunSpec{ + Status: tektonv1.PipelineRunSpecStatusCancelled, + }, + }, + } + + tdata := testclient.Data{ + Namespaces: []*corev1.Namespace{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: ns, + }, + }, + }, + PipelineRuns: pipelineRuns, + } + + orderList := []string{} + for _, pr := range pipelineRuns { + orderList = append(orderList, fmt.Sprintf("%s/%s", ns, pr.GetName())) + } + stdata, _ := testclient.SeedTestData(t, ctx, tdata) + filtered := FilterPipelineRunByState(ctx, stdata.Pipeline, orderList, tektonv1.PipelineRunSpecStatusPending, kubeinteraction.StateQueued) + expected := []string{"test-ns/pr1"} + assert.DeepEqual(t, filtered, expected) +}