diff --git a/pkg/reconciler/pipelinerun/pipelinerun.go b/pkg/reconciler/pipelinerun/pipelinerun.go index f6687352742..eaa5b4baf10 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun.go +++ b/pkg/reconciler/pipelinerun/pipelinerun.go @@ -524,6 +524,9 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun, get return err } + // Reset the skipped status to trigger recalculation + pipelineRunFacts.ResetSkippedCache() + after := pipelineRunFacts.GetPipelineConditionStatus(pr, logger) switch after.Status { case corev1.ConditionTrue: @@ -565,6 +568,9 @@ func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.Pip } resources.ApplyTaskResults(nextRprts, resolvedResultRefs) + // After we apply Task Results, we may be able to evaluate more + // when expressions, so reset the skipped cache + pipelineRunFacts.ResetSkippedCache() // GetFinalTasks only returns tasks when a DAG is complete nextRprts = append(nextRprts, pipelineRunFacts.GetFinalTasks()...) diff --git a/pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go b/pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go index 787a89a1ef8..96c51576f8c 100644 --- a/pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go +++ b/pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go @@ -133,13 +133,7 @@ func (t *ResolvedPipelineRunTask) checkParentsDone(facts *PipelineRunFacts) bool return true } -// Skip returns true if a PipelineTask will not be run because -// (1) its When Expressions evaluated to false -// (2) its Condition Checks failed -// (3) its parent task was skipped -// (4) Pipeline is in stopping state (one of the PipelineTasks failed) -// Note that this means Skip returns false if a conditionCheck is in progress -func (t *ResolvedPipelineRunTask) Skip(facts *PipelineRunFacts) bool { +func (t *ResolvedPipelineRunTask) skip(facts *PipelineRunFacts) bool { if facts.isFinalTask(t.PipelineTask.Name) || t.IsStarted() { return false } @@ -151,6 +145,22 @@ func (t *ResolvedPipelineRunTask) Skip(facts *PipelineRunFacts) bool { return false } +// Skip returns true if a PipelineTask will not be run because +// (1) its When Expressions evaluated to false +// (2) its Condition Checks failed +// (3) its parent task was skipped +// (4) Pipeline is in stopping state (one of the PipelineTasks failed) +// Note that this means Skip returns false if a conditionCheck is in progress +func (t *ResolvedPipelineRunTask) Skip(facts *PipelineRunFacts) bool { + if facts.SkipCache == nil { + facts.SkipCache = make(map[string]bool) + } + if _, cached := facts.SkipCache[t.PipelineTask.Name]; !cached { + facts.SkipCache[t.PipelineTask.Name] = t.skip(facts) // t.skip() is same as our existing t.Skip() + } + return facts.SkipCache[t.PipelineTask.Name] +} + func (t *ResolvedPipelineRunTask) conditionsSkip() bool { if len(t.ResolvedConditionChecks) > 0 { if t.ResolvedConditionChecks.IsDone() && !t.ResolvedConditionChecks.IsSuccess() { diff --git a/pkg/reconciler/pipelinerun/resources/pipelinerunstate.go b/pkg/reconciler/pipelinerun/resources/pipelinerunstate.go index f9a5e965d70..80a483d1268 100644 --- a/pkg/reconciler/pipelinerun/resources/pipelinerunstate.go +++ b/pkg/reconciler/pipelinerun/resources/pipelinerunstate.go @@ -37,6 +37,16 @@ type PipelineRunFacts struct { State PipelineRunState TasksGraph *dag.Graph FinalTasksGraph *dag.Graph + + // SkipCache is a hash of PipelineTask names that stores whether a task will be + // executed or not, because it's either not reachable via the DAG due to the pipeline + // state, or because it has failed conditions. + // We cache this data along the state, because it's expensive to compute, it requires + // traversing potentially the whole graph; this way it can built incrementally, when + // needed, via the `Skip` method in pipelinerunresolution.go + // The skip data is sensitive to changes in the state. The ResetSkippedCache method + // can be used to clean the cache and force re-computation when needed. + SkipCache map[string]bool } // pipelineRunStatusCount holds the count of successful, failed, cancelled, skipped, and incomplete tasks @@ -53,6 +63,11 @@ type pipelineRunStatusCount struct { Incomplete int } +// ResetSkippedCache resets the skipped cache in the facts map +func (facts *PipelineRunFacts) ResetSkippedCache() { + facts.SkipCache = make(map[string]bool) +} + // ToMap returns a map that maps pipeline task name to the resolved pipeline run task func (state PipelineRunState) ToMap() map[string]*ResolvedPipelineRunTask { m := make(map[string]*ResolvedPipelineRunTask) diff --git a/pkg/reconciler/pipelinerun/resources/pipelinerunstate_test.go b/pkg/reconciler/pipelinerun/resources/pipelinerunstate_test.go index 0407c89f19e..86bad2ce4b0 100644 --- a/pkg/reconciler/pipelinerun/resources/pipelinerunstate_test.go +++ b/pkg/reconciler/pipelinerun/resources/pipelinerunstate_test.go @@ -17,6 +17,7 @@ limitations under the License. package resources import ( + "fmt" "testing" "time" @@ -408,6 +409,7 @@ func TestGetNextTaskWithRetries(t *testing.T) { } func TestPipelineRunState_SuccessfulOrSkippedDAGTasks(t *testing.T) { + largePipelineState := buildPipelineStateWithLargeDepencyGraph(t) tcs := []struct { name string state PipelineRunState @@ -454,6 +456,10 @@ func TestPipelineRunState_SuccessfulOrSkippedDAGTasks(t *testing.T) { "not skipped since it failed", state: conditionCheckFailedWithOthersFailedState, expectedNames: []string{pts[5].Name}, + }, { + name: "large deps, not started", + state: largePipelineState, + expectedNames: []string{}, }} for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { @@ -474,6 +480,65 @@ func TestPipelineRunState_SuccessfulOrSkippedDAGTasks(t *testing.T) { } } +func buildPipelineStateWithLargeDepencyGraph(t *testing.T) PipelineRunState { + t.Helper() + var task = &v1beta1.Task{ + ObjectMeta: metav1.ObjectMeta{ + Name: "task", + }, + Spec: v1beta1.TaskSpec{ + Steps: []v1beta1.Step{{Container: corev1.Container{ + Name: "step1", + }}}, + }, + } + var pipelineRunState PipelineRunState + pipelineRunState = []*ResolvedPipelineRunTask{{ + PipelineTask: &v1beta1.PipelineTask{ + Name: "t1", + TaskRef: &v1beta1.TaskRef{Name: "task"}, + }, + TaskRun: nil, + ResolvedTaskResources: &resources.ResolvedTaskResources{ + TaskSpec: &task.Spec, + }, + }} + for i := 2; i < 60; i++ { + dependFrom := 1 + if i > 10 { + if i%10 == 0 { + dependFrom = i - 10 + } else { + dependFrom = i - (i % 10) + } + } + params := []v1beta1.Param{} + var alpha byte + for alpha = 'a'; alpha <= 'j'; alpha++ { + params = append(params, v1beta1.Param{ + Name: fmt.Sprintf("%c", alpha), + Value: v1beta1.ArrayOrString{ + Type: v1beta1.ParamTypeString, + StringVal: fmt.Sprintf("$(tasks.t%d.results.%c)", dependFrom, alpha), + }, + }) + } + pipelineRunState = append(pipelineRunState, &ResolvedPipelineRunTask{ + PipelineTask: &v1beta1.PipelineTask{ + Name: fmt.Sprintf("t%d", i), + Params: params, + TaskRef: &v1beta1.TaskRef{Name: "task"}, + }, + TaskRun: nil, + ResolvedTaskResources: &resources.ResolvedTaskResources{ + TaskSpec: &task.Spec, + }, + }, + ) + } + return pipelineRunState +} + func TestPipelineRunState_GetFinalTasks(t *testing.T) { tcs := []struct { name string