-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Handle dag in pipelineresoultion #2821
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -40,7 +40,6 @@ import ( | |
"github.com/tektoncd/pipeline/pkg/contexts" | ||
"github.com/tektoncd/pipeline/pkg/reconciler" | ||
"github.com/tektoncd/pipeline/pkg/reconciler/events" | ||
"github.com/tektoncd/pipeline/pkg/reconciler/pipeline/dag" | ||
"github.com/tektoncd/pipeline/pkg/reconciler/pipelinerun/resources" | ||
"github.com/tektoncd/pipeline/pkg/reconciler/taskrun" | ||
"github.com/tektoncd/pipeline/pkg/reconciler/volumeclaim" | ||
|
@@ -284,31 +283,15 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err | |
pr.ObjectMeta.Annotations[key] = value | ||
} | ||
|
||
d, err := dag.Build(v1beta1.PipelineTaskList(pipelineSpec.Tasks)) | ||
if err != nil { | ||
// This Run has failed, so we need to mark it as failed and stop reconciling it | ||
pr.Status.MarkFailed(ReasonInvalidGraph, | ||
"PipelineRun %s/%s's Pipeline DAG is invalid: %s", | ||
pr.Namespace, pr.Name, err) | ||
return controller.NewPermanentError(err) | ||
} | ||
|
||
// build DAG with a list of final tasks, this DAG is used later to identify | ||
// if a task in PipelineRunState is final task or not | ||
// the finally section is optional and might not exist | ||
// dfinally holds an empty Graph in the absence of finally clause | ||
dfinally, err := dag.Build(v1beta1.PipelineTaskList(pipelineSpec.Finally)) | ||
if err != nil { | ||
// This Run has failed, so we need to mark it as failed and stop reconciling it | ||
pr.Status.MarkFailed(ReasonInvalidGraph, | ||
"PipelineRun %s's Pipeline DAG is invalid for finally clause: %s", | ||
pr.Namespace, pr.Name, err) | ||
return controller.NewPermanentError(err) | ||
} | ||
|
||
if err := pipelineSpec.Validate(ctx); err != nil { | ||
// This Run has failed, so we need to mark it as failed and stop reconciling it | ||
pr.Status.MarkFailed(ReasonFailedValidation, | ||
var reason = ReasonFailedValidation | ||
if err.Details == "Invalid Graph" { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because the graph is created in
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @afrittoli There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could certainly move the check on invalid graph to Also, such reason could be generated where the error is getting created in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Validation is done by the webhook when resources are submitted to etcd. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
That's the way I implemented it originally, but because we validate the spec here, we don't really catch the error in
|
||
// When the validation error reports tasks or finally in the error path | ||
// we can use a more specific validation error reason | ||
reason = ReasonInvalidGraph | ||
} | ||
pr.Status.MarkFailed(reason, | ||
"Pipeline %s/%s can't be Run; it has an invalid spec: %s", | ||
pipelineMeta.Namespace, pipelineMeta.Name, err) | ||
return controller.NewPermanentError(err) | ||
|
@@ -372,7 +355,7 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err | |
// pipelineState also holds a taskRun for each pipeline task after the taskRun is created | ||
// pipelineState is instantiated and updated on every reconcile cycle | ||
pipelineState, err := resources.ResolvePipelineRun(ctx, | ||
*pr, | ||
*pipelineSpec, *pr, | ||
func(name string) (v1beta1.TaskInterface, error) { | ||
return c.taskLister.Tasks(pr.Namespace).Get(name) | ||
}, | ||
|
@@ -385,7 +368,7 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err | |
func(name string) (*v1alpha1.Condition, error) { | ||
return c.conditionLister.Conditions(pr.Namespace).Get(name) | ||
}, | ||
append(pipelineSpec.Tasks, pipelineSpec.Finally...), providedResources, | ||
providedResources, | ||
) | ||
|
||
if err != nil { | ||
|
@@ -413,7 +396,7 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err | |
return nil | ||
} | ||
|
||
for _, rprt := range pipelineState { | ||
for _, rprt := range pipelineState.State() { | ||
err := taskrun.ValidateResolvedTaskResources(rprt.PipelineTask.Params, rprt.ResolvedTaskResources) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This check There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, it sounds like something we good do. Perhaps in different PR? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yup definitely, not everything has to be part of single PR |
||
if err != nil { | ||
logger.Errorf("Failed to validate pipelinerun %q with error %v", pr.Name, err) | ||
|
@@ -452,11 +435,11 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err | |
return controller.NewPermanentError(err) | ||
} | ||
|
||
if err := c.runNextSchedulableTask(ctx, pr, d, dfinally, pipelineState, as); err != nil { | ||
if err := c.runNextSchedulableTask(ctx, pr, *pipelineState, as); err != nil { | ||
return err | ||
} | ||
|
||
after := resources.GetPipelineConditionStatus(pr, pipelineState, logger, d, dfinally) | ||
after := resources.GetPipelineConditionStatus(*pr, *pipelineState, logger) | ||
switch after.Status { | ||
case corev1.ConditionTrue: | ||
pr.Status.MarkSucceeded(after.Reason, after.Message) | ||
|
@@ -467,39 +450,36 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err | |
} | ||
// Read the condition the way it was set by the Mark* helpers | ||
after = pr.Status.GetCondition(apis.ConditionSucceeded) | ||
pr.Status.TaskRuns = getTaskRunsStatus(pr, pipelineState) | ||
pr.Status.TaskRuns = getTaskRunsStatus(pr, pipelineState.State()) | ||
logger.Infof("PipelineRun %s status is being set to %s", pr.Name, after) | ||
return nil | ||
} | ||
|
||
// runNextSchedulableTask gets the next schedulable Tasks from the dag based on the current | ||
// pipeline run state, and starts them | ||
// after all DAG tasks are done, it's responsible for scheduling final tasks and start executing them | ||
func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.PipelineRun, d *dag.Graph, dfinally *dag.Graph, pipelineState resources.PipelineRunState, as artifacts.ArtifactStorageInterface) error { | ||
|
||
// runNextSchedulableTask gets the next schedulable Tasks based the pipeline run state and starts them | ||
func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.PipelineRun, pipelineState resources.ResolvedPipelineRun, as artifacts.ArtifactStorageInterface) error { | ||
logger := logging.FromContext(ctx) | ||
recorder := controller.GetEventRecorder(ctx) | ||
|
||
var nextRprts []*resources.ResolvedPipelineRunTask | ||
var ( | ||
nextRprts []*resources.ResolvedPipelineRunTask | ||
err error | ||
) | ||
|
||
// when pipeline run is stopping, do not schedule any new task and only | ||
// wait for all running tasks to complete and report their status | ||
if !pipelineState.IsStopping(d) { | ||
// candidateTasks is initialized to DAG root nodes to start pipeline execution | ||
// candidateTasks is derived based on successfully finished tasks and/or skipped tasks | ||
candidateTasks, err := dag.GetSchedulable(d, pipelineState.SuccessfulOrSkippedDAGTasks(d)...) | ||
if !pipelineState.IsStopping() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this check can be moved in the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this check is already redundant now, but I need to try it out, and I was planning to do so in a next PR, along with giving a single NextTasks method for the pipeline controller to use. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sounds good, thanks 👏 |
||
// nextRprts holds a list of pipeline tasks which should be executed next | ||
nextRprts, err = pipelineState.GetNextTasks() | ||
if err != nil { | ||
logger.Errorf("Error getting potential next tasks for valid pipelinerun %s: %v", pr.Name, err) | ||
return controller.NewPermanentError(err) | ||
} | ||
// nextRprts holds a list of pipeline tasks which should be executed next | ||
nextRprts = pipelineState.GetNextTasks(candidateTasks) | ||
} | ||
|
||
// GetFinalTasks only returns tasks when a DAG is complete | ||
nextRprts = append(nextRprts, pipelineState.GetFinalTasks(d, dfinally)...) | ||
nextRprts = append(nextRprts, pipelineState.GetFinalTasks()...) | ||
|
||
resolvedResultRefs, err := resources.ResolveResultRefs(pipelineState, nextRprts) | ||
resolvedResultRefs, err := resources.ResolveResultRefs(pipelineState.State(), nextRprts) | ||
if err != nil { | ||
logger.Infof("Failed to resolve all task params for %q with error %v", pr.Name, err) | ||
pr.Status.MarkFailed(ReasonFailedValidation, err.Error()) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -451,8 +451,8 @@ func TestReconcile_InvalidPipelineRuns(t *testing.T) { | |
))), | ||
tb.PipelineRun("pipeline-invalid-final-graph", tb.PipelineRunNamespace("foo"), tb.PipelineRunSpec("", tb.PipelineRunPipelineSpec( | ||
tb.PipelineTask("dag-task-1", "taskName"), | ||
tb.FinalPipelineTask("final-task-1", "taskName"), | ||
tb.FinalPipelineTask("final-task-1", "taskName")))), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This error does not trigger an invalid dag when going through validation, it fails first on |
||
tb.FinalPipelineTask("final-task-1", "taskName", tb.RunAfter("dag-task-2")), | ||
))), | ||
} | ||
|
||
d := test.Data{ | ||
|
@@ -1741,13 +1741,15 @@ func TestReconcileWithConditionChecks(t *testing.T) { | |
expectedConditionChecks[index] = makeExpectedTr(condition.Name, ccNames[condition.Name], condition.Labels, condition.Annotations) | ||
} | ||
|
||
// Check that the expected TaskRun was created | ||
condCheck0 := clients.Pipeline.Actions()[1].(ktesting.CreateAction).GetObject().(*v1beta1.TaskRun) | ||
condCheck1 := clients.Pipeline.Actions()[2].(ktesting.CreateAction).GetObject().(*v1beta1.TaskRun) | ||
if condCheck0 == nil || condCheck1 == nil { | ||
t.Errorf("Expected two ConditionCheck TaskRuns to be created, but it wasn't.") | ||
// Check that the expected TaskRun were created | ||
actions := clients.Pipeline.Actions() | ||
if !actions[1].Matches("create", "taskruns") || !actions[2].Matches("create", "taskruns") { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doing a cast without checking first leads to an un-managed failure in the tests when the test fails. |
||
t.Fatalf("Expected two ConditionCheck TaskRuns to be created, got instead %d actions: %#v", len(actions), actions) | ||
} | ||
|
||
condCheck0 := actions[1].(ktesting.CreateAction).GetObject().(*v1beta1.TaskRun) | ||
condCheck1 := actions[2].(ktesting.CreateAction).GetObject().(*v1beta1.TaskRun) | ||
|
||
actual := []*v1beta1.TaskRun{condCheck0, condCheck1} | ||
if d := cmp.Diff(actual, expectedConditionChecks); d != "" { | ||
t.Errorf("expected to see 2 ConditionCheck TaskRuns created. Diff %s", diff.PrintWantGot(d)) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check should not be added here.
finally
section is a list of tasks not graph from users perspective. Tasks are validated to not includerunAfter
here. This check will result in confusion. I would rather have the checks be done explicitly forrunAfter
the way its implemented.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only reason I added the check here was to be consistent with the current behaviour. We have a unit test right now that enforces emitting an
InvalidDagError
to the user in case the finally DAG is not valid, and the only way to keep that with the new code org was to add it here.I'd be happy to drop this and drop the unit test and just bubble up any validation error as it's reported by the dag module directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes please, will add it back when we change
finally
to be graph instead of list, thanks 🙏