diff --git a/workflow/controller/dag.go b/workflow/controller/dag.go index e6058b8a537e..6cd270fd2af2 100644 --- a/workflow/controller/dag.go +++ b/workflow/controller/dag.go @@ -314,6 +314,14 @@ func (woc *wfOperationCtx) executeDAGTask(dagCtx *dagContext, taskName string) { node := dagCtx.getTaskNode(taskName) task := dagCtx.GetTask(taskName) if node != nil && node.Fulfilled() { + // Collect the completed task metrics + _, tmpl, _, _ := dagCtx.tmplCtx.ResolveTemplate(task) + if tmpl != nil && tmpl.Metrics != nil { + if prevNodeStatus, ok := woc.preExecutionNodePhases[node.ID]; ok && !prevNodeStatus.Fulfilled() { + localScope, realTimeScope := woc.prepareMetricScope(node) + woc.computeMetrics(tmpl.Metrics.Prometheus, localScope, realTimeScope, false) + } + } if node.Completed() { // Run the node's onExit node, if any. hasOnExitNode, onExitNode, err := woc.runOnExitNode(task.OnExit, task.Name, node.Name, dagCtx.boundaryID, dagCtx.tmplCtx) diff --git a/workflow/controller/operator_metrics_test.go b/workflow/controller/operator_metrics_test.go index d1cbc4f0e6b7..c7e060a7c40a 100644 --- a/workflow/controller/operator_metrics_test.go +++ b/workflow/controller/operator_metrics_test.go @@ -311,3 +311,121 @@ func TestRetryStrategyMetric(t *testing.T) { assert.Contains(t, metricErrorCounterString, `counter: `) } + +var dagTmplMetrics = ` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + name: hello-world-nl9bj +spec: + arguments: {} + entrypoint: steps + templates: + - arguments: {} + dag: + tasks: + - arguments: {} + name: random-int-dag + template: random-int + - arguments: {} + name: flakey-dag + template: flakey + + name: steps + outputs: {} + - arguments: {} + container: + args: + - RAND_INT=$((1 + RANDOM % 10)); echo $RAND_INT; echo $RAND_INT > /tmp/rand_int.txt + command: + - sh + - -c + image: alpine:latest + name: "" + resources: {} + inputs: {} + metadata: {} + metrics: + prometheus: + - help: Value of the int emitted by random-int at step level + histogram: + buckets: + - 2.01 + - 4.01 + - 6.01 + - 8.01 + - 10.01 + value: 5 + name: random_int_step_histogram_dag + - gauge: + realtime: true + value: '{{duration}}' + help: Duration gauge by name + labels: + - key: name + value: random-int + name: duration_gauge_dag + name: random-int + outputs: + parameters: + - globalName: rand-int-value + name: rand-int-value + valueFrom: + path: /tmp/rand_int.txt + - arguments: {} + container: + args: + - import random; import sys; exit_code = random.choice([0, 1, 1]); sys.exit(exit_code) + command: + - python + - -c + image: python:alpine3.6 + name: "" + resources: {} + inputs: {} + metadata: {} + metrics: + prometheus: + - counter: + value: "1" + help: Count of step execution by result status + labels: + - key: name + value: flakey + - key: status + value: Failed + name: result_counter_dag + name: flakey + outputs: {} +` + +func TestDAGTmplMetrics(t *testing.T) { + cancel, controller := newController() + defer cancel() + wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("") + wf := unmarshalWF(dagTmplMetrics) + _, err := wfcset.Create(wf) + assert.NoError(t, err) + woc := newWorkflowOperationCtx(wf, controller) + + woc.operate() + makePodsPhase(t, apiv1.PodSucceeded, controller.kubeclientset, wf.Namespace) + woc.operate() + tmpl := woc.wf.GetTemplateByName("random-int") + assert.NotNil(t, tmpl) + metricDesc := tmpl.Metrics.Prometheus[0].GetDesc() + assert.NotNil(t, controller.metrics.GetCustomMetric(metricDesc)) + metricHistogram := controller.metrics.GetCustomMetric(metricDesc).(prometheus.Histogram) + metricHistogramString, err := getMetricStringValue(metricHistogram) + assert.NoError(t, err) + assert.Contains(t, metricHistogramString, `histogram: `) +}