From e663374c02f974d47436f6888c5b022e19b7019e Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Tue, 28 Jul 2020 14:56:30 -0700 Subject: [PATCH 1/4] fix: DAG level Output Artifacts on K8S and Kubelet executor --- workflow/validate/validate.go | 4 ++ workflow/validate/validate_test.go | 74 ++++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+) diff --git a/workflow/validate/validate.go b/workflow/validate/validate.go index 29b467496d85..634d99e9a3a6 100644 --- a/workflow/validate/validate.go +++ b/workflow/validate/validate.go @@ -1002,6 +1002,10 @@ func validateOutputs(scope map[string]interface{}, tmpl *wfv1.Template) error { // validateBaseImageOutputs detects if the template contains an valid output from base image layer func (ctx *templateValidationCtx) validateBaseImageOutputs(tmpl *wfv1.Template) error { + // This validation is not applicable for DAG and Step Template types + if tmpl.GetType() == wfv1.TemplateTypeDAG || tmpl.GetType() == wfv1.TemplateTypeSteps { + return nil + } switch ctx.ContainerRuntimeExecutor { case "", common.ContainerRuntimeExecutorDocker: // docker executor supports all modes of artifact outputs diff --git a/workflow/validate/validate_test.go b/workflow/validate/validate_test.go index 69a8d29b3dd2..e69fafbaef92 100644 --- a/workflow/validate/validate_test.go +++ b/workflow/validate/validate_test.go @@ -35,6 +35,11 @@ func validate(yamlStr string) (*wfv1.Conditions, error) { return ValidateWorkflow(wftmplGetter, cwftmplGetter, wf, ValidateOpts{}) } +func validateWithOptions(yamlStr string, opts ValidateOpts) (*wfv1.Conditions, error) { + wf := unmarshalWf(yamlStr) + return ValidateWorkflow(wftmplGetter, cwftmplGetter, wf, opts) +} + // validateWorkflowTemplate is a test helper to accept WorkflowTemplate YAML as a string and return // its validation result. func validateWorkflowTemplate(yamlStr string) error { @@ -2542,3 +2547,72 @@ func TestWorkflowWithWFTRefWithOverrideParam(t *testing.T) { _, err = validate(wfWithWFTRefOverrideParam) assert.NoError(t, err) } + +var dagAndStepLevelOutputArtifacts = ` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: dag-target- +spec: + entrypoint: main + templates: + - name: main + outputs: + artifacts: + - name: artifact + from: "{{tasks.artifact-svn-retrieve.outputs.artifacts.artifact}}" + dag: + tasks: + - name: artifact-svn-retrieve + template: artifact-svn-retrieve + - name: step-tmpl + template: step + + - name: step + outputs: + artifacts: + - name: artifact + from: "{{steps.artifact-svn-retrieve.outputs.artifacts.artifact}}" + steps: + - - name: artifact-svn-retrieve + template: artifact-svn-retrieve + + - name: artifact-svn-retrieve + outputs: + artifacts: + - name: artifact + path: "/vol/hello_world.txt" + container: + image: docker/whalesay:latest + command: [sh, -c] + args: ["sleep 1; cowsay hello world | tee /vol/hello_world.txt"] + volumeMounts: + - name: vol + mountPath: "/vol" + volumes: + - name: vol + emptyDir: {} +` + +func TestDagAndStepLevelOutputArtifactsForDiffExecutor(t *testing.T) { + t.Run("DefaultExecutor", func(t *testing.T) { + _, err := validateWithOptions(dagAndStepLevelOutputArtifacts, ValidateOpts{ContainerRuntimeExecutor: ""}) + assert.NoError(t, err) + }) + t.Run("DockerExecutor", func(t *testing.T) { + _, err := validateWithOptions(dagAndStepLevelOutputArtifacts, ValidateOpts{ContainerRuntimeExecutor: common.ContainerRuntimeExecutorDocker}) + assert.NoError(t, err) + }) + t.Run("PNSExecutor", func(t *testing.T) { + _, err := validateWithOptions(dagAndStepLevelOutputArtifacts, ValidateOpts{ContainerRuntimeExecutor: common.ContainerRuntimeExecutorPNS}) + assert.NoError(t, err) + }) + t.Run("K8SExecutor", func(t *testing.T) { + _, err := validateWithOptions(dagAndStepLevelOutputArtifacts, ValidateOpts{ContainerRuntimeExecutor: common.ContainerRuntimeExecutorK8sAPI}) + assert.NoError(t, err) + }) + t.Run("KubeletExecutor", func(t *testing.T) { + _, err := validateWithOptions(dagAndStepLevelOutputArtifacts, ValidateOpts{ContainerRuntimeExecutor: common.ContainerRuntimeExecutorKubelet}) + assert.NoError(t, err) + }) +} \ No newline at end of file From b17c19963033ddefa892c71222392c026904f1f8 Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Thu, 27 Aug 2020 21:52:07 -0700 Subject: [PATCH 2/4] fix: Custom metrics are not recorded for DAG tasks --- workflow/controller/dag.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/workflow/controller/dag.go b/workflow/controller/dag.go index b627f48691f2..78b46752db9e 100644 --- a/workflow/controller/dag.go +++ b/workflow/controller/dag.go @@ -314,6 +314,14 @@ func (woc *wfOperationCtx) executeDAGTask(dagCtx *dagContext, taskName string) { node := dagCtx.getTaskNode(taskName) task := dagCtx.GetTask(taskName) if node != nil && node.Fulfilled() { + // Collect the completed leaf task metrics + tmpl := woc.wf.GetTemplateByName(task.Template) + if tmpl != nil { + if prevNodeStatus, ok := woc.preExecutionNodePhases[node.ID]; ok && !prevNodeStatus.Fulfilled() { + localScope, realTimeScope := woc.prepareMetricScope(node) + woc.computeMetrics(tmpl.Metrics.Prometheus, localScope, realTimeScope, false) + } + } if node.Completed() { // Run the node's onExit node, if any. hasOnExitNode, onExitNode, err := woc.runOnExitNode(task.OnExit, task.Name, node.Name, dagCtx.boundaryID, dagCtx.tmplCtx) From 7c6f4f3e7a65d3c92b087984c7b5ca54700cdcc5 Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Fri, 28 Aug 2020 14:55:46 -0700 Subject: [PATCH 3/4] Added test --- workflow/controller/dag.go | 4 +- workflow/controller/operator_metrics_test.go | 118 +++++++++++++++++++ 2 files changed, 120 insertions(+), 2 deletions(-) diff --git a/workflow/controller/dag.go b/workflow/controller/dag.go index 78b46752db9e..0c7a83a9d432 100644 --- a/workflow/controller/dag.go +++ b/workflow/controller/dag.go @@ -314,9 +314,9 @@ func (woc *wfOperationCtx) executeDAGTask(dagCtx *dagContext, taskName string) { node := dagCtx.getTaskNode(taskName) task := dagCtx.GetTask(taskName) if node != nil && node.Fulfilled() { - // Collect the completed leaf task metrics + // Collect the completed task metrics tmpl := woc.wf.GetTemplateByName(task.Template) - if tmpl != nil { + if tmpl != nil && tmpl.Metrics != nil { if prevNodeStatus, ok := woc.preExecutionNodePhases[node.ID]; ok && !prevNodeStatus.Fulfilled() { localScope, realTimeScope := woc.prepareMetricScope(node) woc.computeMetrics(tmpl.Metrics.Prometheus, localScope, realTimeScope, false) diff --git a/workflow/controller/operator_metrics_test.go b/workflow/controller/operator_metrics_test.go index d1cbc4f0e6b7..c7e060a7c40a 100644 --- a/workflow/controller/operator_metrics_test.go +++ b/workflow/controller/operator_metrics_test.go @@ -311,3 +311,121 @@ func TestRetryStrategyMetric(t *testing.T) { assert.Contains(t, metricErrorCounterString, `counter: `) } + +var dagTmplMetrics = ` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + name: hello-world-nl9bj +spec: + arguments: {} + entrypoint: steps + templates: + - arguments: {} + dag: + tasks: + - arguments: {} + name: random-int-dag + template: random-int + - arguments: {} + name: flakey-dag + template: flakey + + name: steps + outputs: {} + - arguments: {} + container: + args: + - RAND_INT=$((1 + RANDOM % 10)); echo $RAND_INT; echo $RAND_INT > /tmp/rand_int.txt + command: + - sh + - -c + image: alpine:latest + name: "" + resources: {} + inputs: {} + metadata: {} + metrics: + prometheus: + - help: Value of the int emitted by random-int at step level + histogram: + buckets: + - 2.01 + - 4.01 + - 6.01 + - 8.01 + - 10.01 + value: 5 + name: random_int_step_histogram_dag + - gauge: + realtime: true + value: '{{duration}}' + help: Duration gauge by name + labels: + - key: name + value: random-int + name: duration_gauge_dag + name: random-int + outputs: + parameters: + - globalName: rand-int-value + name: rand-int-value + valueFrom: + path: /tmp/rand_int.txt + - arguments: {} + container: + args: + - import random; import sys; exit_code = random.choice([0, 1, 1]); sys.exit(exit_code) + command: + - python + - -c + image: python:alpine3.6 + name: "" + resources: {} + inputs: {} + metadata: {} + metrics: + prometheus: + - counter: + value: "1" + help: Count of step execution by result status + labels: + - key: name + value: flakey + - key: status + value: Failed + name: result_counter_dag + name: flakey + outputs: {} +` + +func TestDAGTmplMetrics(t *testing.T) { + cancel, controller := newController() + defer cancel() + wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("") + wf := unmarshalWF(dagTmplMetrics) + _, err := wfcset.Create(wf) + assert.NoError(t, err) + woc := newWorkflowOperationCtx(wf, controller) + + woc.operate() + makePodsPhase(t, apiv1.PodSucceeded, controller.kubeclientset, wf.Namespace) + woc.operate() + tmpl := woc.wf.GetTemplateByName("random-int") + assert.NotNil(t, tmpl) + metricDesc := tmpl.Metrics.Prometheus[0].GetDesc() + assert.NotNil(t, controller.metrics.GetCustomMetric(metricDesc)) + metricHistogram := controller.metrics.GetCustomMetric(metricDesc).(prometheus.Histogram) + metricHistogramString, err := getMetricStringValue(metricHistogram) + assert.NoError(t, err) + assert.Contains(t, metricHistogramString, `histogram: `) +} From c24ef125e8e733256aaa1e523fa06ebdbe99b462 Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Tue, 1 Sep 2020 16:37:00 -0700 Subject: [PATCH 4/4] Update dag.go --- workflow/controller/dag.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/workflow/controller/dag.go b/workflow/controller/dag.go index 0c7a83a9d432..57560ff16f41 100644 --- a/workflow/controller/dag.go +++ b/workflow/controller/dag.go @@ -315,7 +315,7 @@ func (woc *wfOperationCtx) executeDAGTask(dagCtx *dagContext, taskName string) { task := dagCtx.GetTask(taskName) if node != nil && node.Fulfilled() { // Collect the completed task metrics - tmpl := woc.wf.GetTemplateByName(task.Template) + _, tmpl, _, _ := dagCtx.tmplCtx.ResolveTemplate(task) if tmpl != nil && tmpl.Metrics != nil { if prevNodeStatus, ok := woc.preExecutionNodePhases[node.ID]; ok && !prevNodeStatus.Fulfilled() { localScope, realTimeScope := woc.prepareMetricScope(node)