Skip to content

Commit

Permalink
Fix duplicated TaskRuns that occur when Pipeline/PipelineRun labels a…
Browse files Browse the repository at this point in the history
…re changed

The PipelineRun reconciler may be handed a PipelineRun that does not have a previous status update containing the names of TaskRuns it created.
The PipelineRun reconciler attempts to deal with this by looking at the TaskRun lister cache for TaskRuns associated with the PipelineRun
that are missing from the status and adding them.  It uses the PipelineRun's labels to find them.  If the user changes the PipelineRun's labels
(or the Pipeline's labels which are propagated to the PipelineRun) while the pipeline is running, it may cause the code to not find existing
TaskRuns and as a result it may duplicate them.
  • Loading branch information
GregDritschler committed Nov 23, 2020
1 parent bf8fb54 commit 1b2617c
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 12 deletions.
31 changes: 21 additions & 10 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ func (c *Reconciler) createTaskRun(ctx context.Context, rprt *resources.Resolved
}

resources.WrapSteps(&tr.Spec, rprt.PipelineTask, rprt.ResolvedTaskResources.Inputs, rprt.ResolvedTaskResources.Outputs, storageBasePath)
logger.Infof("Creating a new TaskRun object %s", rprt.TaskRunName)
logger.Infof("Creating a new TaskRun object %s for pipeline task %s", rprt.TaskRunName, rprt.PipelineTask.Name)
return c.PipelineClientSet.TektonV1beta1().TaskRuns(pr.Namespace).Create(ctx, tr, metav1.CreateOptions{})
}

Expand Down Expand Up @@ -753,11 +753,13 @@ func getTaskrunAnnotations(pr *v1beta1.PipelineRun) map[string]string {
return annotations
}

func getTaskrunLabels(pr *v1beta1.PipelineRun, pipelineTaskName string) map[string]string {
func getTaskrunLabels(pr *v1beta1.PipelineRun, pipelineTaskName string, includePipelineLabels bool) map[string]string {
// Propagate labels from PipelineRun to TaskRun.
labels := make(map[string]string, len(pr.ObjectMeta.Labels)+1)
for key, val := range pr.ObjectMeta.Labels {
labels[key] = val
if includePipelineLabels {
for key, val := range pr.ObjectMeta.Labels {
labels[key] = val
}
}
labels[pipeline.GroupName+pipeline.PipelineRunLabelKey] = pr.Name
if pipelineTaskName != "" {
Expand All @@ -768,7 +770,7 @@ func getTaskrunLabels(pr *v1beta1.PipelineRun, pipelineTaskName string) map[stri

func combineTaskRunAndTaskSpecLabels(pr *v1beta1.PipelineRun, pipelineTask *v1beta1.PipelineTask) map[string]string {
var tsLabels map[string]string
trLabels := getTaskrunLabels(pr, pipelineTask.Name)
trLabels := getTaskrunLabels(pr, pipelineTask.Name, true)

if pipelineTask.TaskSpec != nil {
tsLabels = pipelineTask.TaskSpecMetadata().Labels
Expand Down Expand Up @@ -868,7 +870,7 @@ func (c *Reconciler) updateLabelsAndAnnotations(ctx context.Context, pr *v1beta1
}

func (c *Reconciler) makeConditionCheckContainer(ctx context.Context, rprt *resources.ResolvedPipelineRunTask, rcc *resources.ResolvedConditionCheck, pr *v1beta1.PipelineRun) (*v1beta1.ConditionCheck, error) {
labels := getTaskrunLabels(pr, rprt.PipelineTask.Name)
labels := getTaskrunLabels(pr, rprt.PipelineTask.Name, true)
labels[pipeline.GroupName+pipeline.ConditionCheckKey] = rcc.ConditionCheckName
labels[pipeline.GroupName+pipeline.ConditionNameKey] = rcc.Condition.Name

Expand Down Expand Up @@ -923,17 +925,20 @@ func storePipelineSpec(ctx context.Context, pr *v1beta1.PipelineRun, ps *v1beta1
func (c *Reconciler) updatePipelineRunStatusFromInformer(ctx context.Context, pr *v1beta1.PipelineRun) error {
logger := logging.FromContext(ctx)

pipelineRunLabels := getTaskrunLabels(pr, "")
// Get the pipelineRun label that is set on each TaskRun. Do not include the propagated labels from the
// Pipeline and PipelineRun. The user could change them during the lifetime of the PipelineRun so the
// current labels may not be set on the previously created TaskRuns.
pipelineRunLabels := getTaskrunLabels(pr, "", false)
taskRuns, err := c.taskRunLister.TaskRuns(pr.Namespace).List(labels.SelectorFromSet(pipelineRunLabels))
if err != nil {
logger.Errorf("could not list TaskRuns %#v", err)
return err
}
pr.Status = updatePipelineRunStatusFromTaskRuns(logger, pr.Name, pr.Status, taskRuns)
pr.Status = updatePipelineRunStatusFromTaskRuns(logger, pr, pr.Status, taskRuns)
return nil
}

func updatePipelineRunStatusFromTaskRuns(logger *zap.SugaredLogger, prName string, prStatus v1beta1.PipelineRunStatus, trs []*v1beta1.TaskRun) v1beta1.PipelineRunStatus {
func updatePipelineRunStatusFromTaskRuns(logger *zap.SugaredLogger, pr *v1beta1.PipelineRun, prStatus v1beta1.PipelineRunStatus, trs []*v1beta1.TaskRun) v1beta1.PipelineRunStatus {
// If no TaskRun was found, nothing to be done. We never remove taskruns from the status
if trs == nil || len(trs) == 0 {
return prStatus
Expand All @@ -951,6 +956,11 @@ func updatePipelineRunStatusFromTaskRuns(logger *zap.SugaredLogger, prName strin
}
// Loop over all the TaskRuns associated to Tasks
for _, taskrun := range trs {
// Only process TaskRuns that are owned by this PipelineRun.
if len(taskrun.OwnerReferences) < 1 || taskrun.OwnerReferences[0].UID != pr.ObjectMeta.UID {
logger.Infof("Found a TaskRun %s that is not owned by this PipelineRun", taskrun.Name)
continue
}
lbls := taskrun.GetLabels()
pipelineTaskName := lbls[pipeline.GroupName+pipeline.PipelineTaskLabelKey]
if _, ok := lbls[pipeline.GroupName+pipeline.ConditionCheckKey]; ok {
Expand All @@ -965,6 +975,7 @@ func updatePipelineRunStatusFromTaskRuns(logger *zap.SugaredLogger, prName strin
if _, ok := prStatus.TaskRuns[taskrun.Name]; !ok {
// This taskrun was missing from the status.
// Add it without conditions, which are handled in the next loop
logger.Infof("Found a TaskRun %s that was missing from the PipelineRun status", taskrun.Name)
prStatus.TaskRuns[taskrun.Name] = &v1beta1.PipelineRunTaskRunStatus{
PipelineTaskName: pipelineTaskName,
Status: &taskrun.Status,
Expand All @@ -982,7 +993,7 @@ func updatePipelineRunStatusFromTaskRuns(logger *zap.SugaredLogger, prName strin
// status. This means that the conditions were orphaned, and never added to the
// status. In this case we need to generate a new TaskRun name, that will be used
// to run the TaskRun if the conditions are passed.
taskRunName = resources.GetTaskRunName(prStatus.TaskRuns, pipelineTaskName, prName)
taskRunName = resources.GetTaskRunName(prStatus.TaskRuns, pipelineTaskName, pr.Name)
prStatus.TaskRuns[taskRunName] = &v1beta1.PipelineRunTaskRunStatus{
PipelineTaskName: pipelineTaskName,
Status: nil,
Expand Down
103 changes: 101 additions & 2 deletions pkg/reconciler/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3105,8 +3105,78 @@ func TestReconcileOutOfSyncPipelineRun(t *testing.T) {
}
}

func TestUpdatePipelineRunStatusFromInformer(t *testing.T) {
names.TestingSeed()

pr := tb.PipelineRun("test-pipeline-run",
tb.PipelineRunNamespace("foo"),
tb.PipelineRunLabel("mylabel", "myvalue"),
tb.PipelineRunSpec("", tb.PipelineRunPipelineSpec(
tb.PipelineTask("unit-test-task-spec", "", tb.PipelineTaskSpec(v1beta1.TaskSpec{
Steps: []v1beta1.Step{{Container: corev1.Container{
Name: "mystep",
Image: "myimage"}}},
})),
)),
)

d := test.Data{
PipelineRuns: []*v1beta1.PipelineRun{pr},
}
prt := NewPipelineRunTest(d, t)
defer prt.Cancel()

wantEvents := []string{
"Normal Started",
"Normal Running Tasks Completed: 0",
}

// Reconcile the PipelineRun. This creates a Taskrun.
reconciledRun, clients := prt.reconcileRun("foo", "test-pipeline-run", wantEvents, false)

// Save the name of the TaskRun that was created.
taskRunName := ""
for k := range reconciledRun.Status.TaskRuns {
if taskRunName != "" {
t.Fatalf("Expected 1 TaskRun but got more")
}
taskRunName = k
}
if taskRunName == "" {
t.Fatalf("Expected 1 TaskRun but got none")
}

// Add a label to the PipelineRun. This tests a scenario in issue 3126 which could prevent the reconciler
// from finding TaskRuns that are missing from the status.
reconciledRun.ObjectMeta.Labels["bah"] = "humbug"
reconciledRun, err := clients.Pipeline.TektonV1beta1().PipelineRuns("foo").Update(prt.TestAssets.Ctx, reconciledRun, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("unexpected error when updating status: %v", err)
}

// The label update triggers another reconcile. Depending on timing, the PipelineRun passed to the reconcile may or may not
// have the updated status with the name of the created TaskRun. Clear the status because we want to test the case where the
// status does not have the TaskRun.
reconciledRun.Status = v1beta1.PipelineRunStatus{}
if _, err := clients.Pipeline.TektonV1beta1().PipelineRuns("foo").UpdateStatus(prt.TestAssets.Ctx, reconciledRun, metav1.UpdateOptions{}); err != nil {
t.Fatalf("unexpected error when updating status: %v", err)
}

reconciledRun, _ = prt.reconcileRun("foo", "test-pipeline-run", wantEvents, false)

// Verify that the reconciler found the existing TaskRun instead of creating a new one.
for k := range reconciledRun.Status.TaskRuns {
if k != taskRunName {
t.Fatalf("Status has unexpected taskrun %s", k)
}
}
}

func TestUpdatePipelineRunStatusFromTaskRuns(t *testing.T) {

prUID := types.UID("9c04a4d9-33b8-4641-ad11-2aceed91af7b")
otherPrUID := types.UID("9c04a4d9-dead-beef-ad11-2aceed91af7b")

// PipelineRunConditionCheckStatus recovered by updatePipelineRunStatusFromTaskRuns
// It does not include the status, which is then retrieved via the regular reconcile
prccs2Recovered := map[string]*v1beta1.PipelineRunConditionCheckStatus{
Expand Down Expand Up @@ -3214,7 +3284,7 @@ func TestUpdatePipelineRunStatusFromTaskRuns(t *testing.T) {
prStatusWithEmptyTaskRuns := v1beta1.PipelineRunStatus{
Status: prRunningStatus,
PipelineRunStatusFields: v1beta1.PipelineRunStatusFields{
TaskRuns: nil,
TaskRuns: map[string]*v1beta1.PipelineRunTaskRunStatus{},
},
}

Expand Down Expand Up @@ -3280,6 +3350,7 @@ func TestUpdatePipelineRunStatusFromTaskRuns(t *testing.T) {
Labels: map[string]string{
pipeline.GroupName + pipeline.PipelineTaskLabelKey: "task-1",
},
OwnerReferences: []metav1.OwnerReference{{UID: prUID}},
},
},
{
Expand All @@ -3290,6 +3361,7 @@ func TestUpdatePipelineRunStatusFromTaskRuns(t *testing.T) {
pipeline.GroupName + pipeline.ConditionCheckKey: "pr-task-2-running-condition-check-xxyyy",
pipeline.GroupName + pipeline.ConditionNameKey: "running-condition",
},
OwnerReferences: []metav1.OwnerReference{{UID: prUID}},
},
},
{
Expand All @@ -3298,6 +3370,7 @@ func TestUpdatePipelineRunStatusFromTaskRuns(t *testing.T) {
Labels: map[string]string{
pipeline.GroupName + pipeline.PipelineTaskLabelKey: "task-3",
},
OwnerReferences: []metav1.OwnerReference{{UID: prUID}},
},
},
{
Expand All @@ -3308,6 +3381,7 @@ func TestUpdatePipelineRunStatusFromTaskRuns(t *testing.T) {
pipeline.GroupName + pipeline.ConditionCheckKey: "pr-task-3-successful-condition-check-xxyyy",
pipeline.GroupName + pipeline.ConditionNameKey: "successful-condition",
},
OwnerReferences: []metav1.OwnerReference{{UID: prUID}},
},
},
{
Expand All @@ -3318,6 +3392,19 @@ func TestUpdatePipelineRunStatusFromTaskRuns(t *testing.T) {
pipeline.GroupName + pipeline.ConditionCheckKey: "pr-task-4-failed-condition-check-xxyyy",
pipeline.GroupName + pipeline.ConditionNameKey: "failed-condition",
},
OwnerReferences: []metav1.OwnerReference{{UID: prUID}},
},
},
}

taskRunsFromAnotherPR := []*v1beta1.TaskRun{
{
ObjectMeta: metav1.ObjectMeta{
Name: "pr-task-1-xxyyy",
Labels: map[string]string{
pipeline.GroupName + pipeline.PipelineTaskLabelKey: "task-1",
},
OwnerReferences: []metav1.OwnerReference{{UID: otherPrUID}},
},
},
}
Expand Down Expand Up @@ -3348,6 +3435,7 @@ func TestUpdatePipelineRunStatusFromTaskRuns(t *testing.T) {
Labels: map[string]string{
pipeline.GroupName + pipeline.PipelineTaskLabelKey: "task-1",
},
OwnerReferences: []metav1.OwnerReference{{UID: prUID}},
},
},
},
Expand All @@ -3362,6 +3450,7 @@ func TestUpdatePipelineRunStatusFromTaskRuns(t *testing.T) {
Labels: map[string]string{
pipeline.GroupName + pipeline.PipelineTaskLabelKey: "task-3",
},
OwnerReferences: []metav1.OwnerReference{{UID: prUID}},
},
},
{
Expand All @@ -3372,6 +3461,7 @@ func TestUpdatePipelineRunStatusFromTaskRuns(t *testing.T) {
pipeline.GroupName + pipeline.ConditionCheckKey: "pr-task-3-successful-condition-check-xxyyy",
pipeline.GroupName + pipeline.ConditionNameKey: "successful-condition",
},
OwnerReferences: []metav1.OwnerReference{{UID: prUID}},
},
},
},
Expand All @@ -3386,6 +3476,11 @@ func TestUpdatePipelineRunStatusFromTaskRuns(t *testing.T) {
prStatus: prStatusWithOrphans,
trs: allTaskRuns,
expectedPrStatus: prStatusRecovered,
}, {
prName: "tr-from-another-pr",
prStatus: prStatusWithEmptyTaskRuns,
trs: taskRunsFromAnotherPR,
expectedPrStatus: prStatusWithEmptyTaskRuns,
},
}

Expand All @@ -3394,7 +3489,11 @@ func TestUpdatePipelineRunStatusFromTaskRuns(t *testing.T) {
observer, _ := observer.New(zap.InfoLevel)
logger := zap.New(observer).Sugar()

actualPrStatus := updatePipelineRunStatusFromTaskRuns(logger, tc.prName, tc.prStatus, tc.trs)
pr := &v1beta1.PipelineRun{
ObjectMeta: metav1.ObjectMeta{Name: tc.prName, UID: prUID},
Status: tc.prStatus,
}
actualPrStatus := updatePipelineRunStatusFromTaskRuns(logger, pr, tc.prStatus, tc.trs)

// The TaskRun keys for recovered taskruns will contain a new random key, appended to the
// base name that we expect. Replace the random part so we can diff the whole structure
Expand Down

0 comments on commit 1b2617c

Please sign in to comment.