From 1dc113d4029495029b827a50968c2bae589621d0 Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Wed, 15 Jul 2020 16:56:09 -0700 Subject: [PATCH 01/11] fix: Exceeding quota with volumeClaimTemplates --- examples/steps.yaml | 9 +++++++++ workflow/controller/operator.go | 27 +++++++++++++++++++++------ 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/examples/steps.yaml b/examples/steps.yaml index c0434f4644a6..b72b71f2799a 100644 --- a/examples/steps.yaml +++ b/examples/steps.yaml @@ -10,18 +10,27 @@ spec: - name: hello-hello-hello steps: - - name: hello1 + onExit: exit template: whalesay arguments: parameters: [{name: message, value: "hello1"}] - - name: hello2a + onExit: exit template: whalesay arguments: parameters: [{name: message, value: "hello2a"}] - name: hello2b + onExit: exit template: whalesay arguments: parameters: [{name: message, value: "hello2b"}] + - name: exit + container: + image: docker/whalesay + command: [cowsay] + args: ["{{steps.hello1.exitCode}}"] + - name: whalesay inputs: parameters: diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 5888c0760210..839b5ac0e2ee 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -282,6 +282,14 @@ func (woc *wfOperationCtx) operate() { err = woc.createPVCs() if err != nil { + if apierr.IsForbidden(err) { + // Error was most likely caused by a lack of resources. + // In this case, Workflow will be in pending state and requeue. + woc.markWorkflowPhase(wfv1.NodePending, false) + woc.requeue(0) + return + + } msg := "pvc create error" woc.log.WithError(err).Error(msg) woc.markWorkflowError(err, true) @@ -1225,16 +1233,20 @@ func inferFailedReason(pod *apiv1.Pod) (wfv1.NodePhase, string) { } func (woc *wfOperationCtx) createPVCs() error { - if woc.wf.Status.Phase != wfv1.NodeRunning { - // Only attempt to create PVCs if workflow transitioned to Running state + + if !(woc.wf.Status.Phase == wfv1.NodePending || woc.wf.Status.Phase == wfv1.NodeRunning) { + // Only attempt to create PVCs if workflow is in Pending or Running state // (e.g. passed validation, or didn't already complete) return nil } - if len(woc.wfSpec.VolumeClaimTemplates) == len(woc.wf.Status.PersistentVolumeClaims) { - // If we have already created the PVCs, then there is nothing to do. - // This will also handle the case where workflow has no volumeClaimTemplates. - return nil + + existPVC := make(map[string]bool) + for _, pvc := range woc.wf.Status.PersistentVolumeClaims { + if pvc.Name != "" { + existPVC[pvc.PersistentVolumeClaim.ClaimName] = true + } } + if len(woc.wf.Status.PersistentVolumeClaims) == 0 { woc.wf.Status.PersistentVolumeClaims = make([]apiv1.Volume, len(woc.wfSpec.VolumeClaimTemplates)) } @@ -1243,6 +1255,9 @@ func (woc *wfOperationCtx) createPVCs() error { if pvcTmpl.ObjectMeta.Name == "" { return errors.Errorf(errors.CodeBadRequest, "volumeClaimTemplates[%d].metadata.name is required", i) } + if found := existPVC[pvcTmpl.ObjectMeta.Name]; found { + continue + } pvcTmpl = *pvcTmpl.DeepCopy() // PVC name will be - refName := pvcTmpl.ObjectMeta.Name From 94746b28ccecb6f5f69174d23c4056752cbdf1b7 Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Wed, 15 Jul 2020 16:57:28 -0700 Subject: [PATCH 02/11] Update steps.yaml --- examples/steps.yaml | 9 --------- 1 file changed, 9 deletions(-) diff --git a/examples/steps.yaml b/examples/steps.yaml index b72b71f2799a..c0434f4644a6 100644 --- a/examples/steps.yaml +++ b/examples/steps.yaml @@ -10,27 +10,18 @@ spec: - name: hello-hello-hello steps: - - name: hello1 - onExit: exit template: whalesay arguments: parameters: [{name: message, value: "hello1"}] - - name: hello2a - onExit: exit template: whalesay arguments: parameters: [{name: message, value: "hello2a"}] - name: hello2b - onExit: exit template: whalesay arguments: parameters: [{name: message, value: "hello2b"}] - - name: exit - container: - image: docker/whalesay - command: [cowsay] - args: ["{{steps.hello1.exitCode}}"] - - name: whalesay inputs: parameters: From fe66eab8f66415ebeb5eb4bf34ac313646d331fa Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Wed, 15 Jul 2020 19:16:08 -0700 Subject: [PATCH 03/11] Added e2e test --- test/e2e/fixtures/when.go | 9 +++++++++ test/e2e/functional_test.go | 18 ++++++++++++++++++ test/e2e/testdata/storage-limit.yaml | 24 ++++++++++++++++++++++++ test/util/resourcequota.go | 18 +++++++++++++++--- 4 files changed, 66 insertions(+), 3 deletions(-) create mode 100644 test/e2e/testdata/storage-limit.yaml diff --git a/test/e2e/fixtures/when.go b/test/e2e/fixtures/when.go index c51cf9045501..86475db83c37 100644 --- a/test/e2e/fixtures/when.go +++ b/test/e2e/fixtures/when.go @@ -227,6 +227,15 @@ func (w *When) MemoryQuota(quota string) *When { return w } +func (w *When) StorageQuota(quota string) *When { + obj, err := util.CreateHardStorageQuota(w.kubeClient, "argo", "storage-quota", quota) + if err != nil { + w.t.Fatal(err) + } + w.resourceQuota = obj + return w +} + func (w *When) DeleteQuota() *When { err := util.DeleteQuota(w.kubeClient, w.resourceQuota) if err != nil { diff --git a/test/e2e/functional_test.go b/test/e2e/functional_test.go index 9fc50e1127c5..1594036216ba 100644 --- a/test/e2e/functional_test.go +++ b/test/e2e/functional_test.go @@ -639,6 +639,24 @@ spec: }) } +func (s *FunctionalSuite) TestStorageQuotaLimit() { + s.Given(). + Workflow("@testdata/storage-limit.yaml"). + When(). + StorageQuota("5Mi"). + SubmitWorkflow(). + WaitForWorkflowToStart(5*time.Second). + WaitForWorkflowCondition(func(wf *wfv1.Workflow) bool { + return wfv1.NodePending == wf.Status.Phase + }, "PVC pending", 10*time.Second). + DeleteQuota(). + WaitForWorkflow(30 * time.Second). + Then(). + ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) { + assert.Equal(t, wfv1.NodeSucceeded, status.Phase) + }) +} + func TestFunctionalSuite(t *testing.T) { suite.Run(t, new(FunctionalSuite)) } diff --git a/test/e2e/testdata/storage-limit.yaml b/test/e2e/testdata/storage-limit.yaml new file mode 100644 index 000000000000..7e5b679fed47 --- /dev/null +++ b/test/e2e/testdata/storage-limit.yaml @@ -0,0 +1,24 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + name: storage-quota-limit + labels: + argo-e2e: true +spec: + serviceAccountName: argo + entrypoint: wait + volumeClaimTemplates: # define volume, same syntax as k8s Pod spec + - metadata: + name: workdir1 # name of volume claim + spec: + accessModes: [ "ReadWriteMany" ] + resources: + requests: + storage: 20Mi + + templates: + - name: wait + script: + image: alpine:latest + command: [sh, -c] + args: ["echo", "10s"] diff --git a/test/util/resourcequota.go b/test/util/resourcequota.go index e6343ddf0d46..63874b8c1bc5 100644 --- a/test/util/resourcequota.go +++ b/test/util/resourcequota.go @@ -8,15 +8,27 @@ import ( ) func CreateHardMemoryQuota(clientset kubernetes.Interface, namespace, name, memoryLimit string) (*corev1.ResourceQuota, error) { + resourceList := corev1.ResourceList{ + corev1.ResourceLimitsMemory: resource.MustParse(memoryLimit), + } + return CreateResourceQuota(clientset, namespace, name, resourceList) +} + +func CreateHardStorageQuota(clientset kubernetes.Interface, namespace, name, storageLimit string) (*corev1.ResourceQuota, error) { + resourceList := corev1.ResourceList{ + "requests.storage": resource.MustParse(storageLimit), + } + return CreateResourceQuota(clientset, namespace, name, resourceList) +} + +func CreateResourceQuota(clientset kubernetes.Interface, namespace, name string, rl corev1.ResourceList) (*corev1.ResourceQuota, error) { return clientset.CoreV1().ResourceQuotas(namespace).Create(&corev1.ResourceQuota{ ObjectMeta: metav1.ObjectMeta{ Name: name, Labels: map[string]string{"argo-e2e": "true"}, }, Spec: corev1.ResourceQuotaSpec{ - Hard: corev1.ResourceList{ - corev1.ResourceLimitsMemory: resource.MustParse(memoryLimit), - }, + Hard: rl, }, }) } From dce4ce4c5efa7de87bbae75b487a53837796d892 Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Thu, 16 Jul 2020 08:59:51 -0700 Subject: [PATCH 04/11] Update operator.go --- workflow/controller/operator.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 839b5ac0e2ee..bfc04b759567 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -1233,20 +1233,17 @@ func inferFailedReason(pod *apiv1.Pod) (wfv1.NodePhase, string) { } func (woc *wfOperationCtx) createPVCs() error { - if !(woc.wf.Status.Phase == wfv1.NodePending || woc.wf.Status.Phase == wfv1.NodeRunning) { // Only attempt to create PVCs if workflow is in Pending or Running state // (e.g. passed validation, or didn't already complete) return nil } - existPVC := make(map[string]bool) for _, pvc := range woc.wf.Status.PersistentVolumeClaims { if pvc.Name != "" { existPVC[pvc.PersistentVolumeClaim.ClaimName] = true } } - if len(woc.wf.Status.PersistentVolumeClaims) == 0 { woc.wf.Status.PersistentVolumeClaims = make([]apiv1.Volume, len(woc.wfSpec.VolumeClaimTemplates)) } From 794cd6fda9b68bce52ebbde17109a1d7f11b2da9 Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Mon, 20 Jul 2020 08:17:34 -0700 Subject: [PATCH 05/11] addressed comments --- test/e2e/fixtures/when.go | 12 +++++++++++- test/e2e/functional_test.go | 5 +++-- workflow/controller/operator.go | 29 +++++++++++++---------------- 3 files changed, 27 insertions(+), 19 deletions(-) diff --git a/test/e2e/fixtures/when.go b/test/e2e/fixtures/when.go index 86475db83c37..bceef8536026 100644 --- a/test/e2e/fixtures/when.go +++ b/test/e2e/fixtures/when.go @@ -35,6 +35,7 @@ type When struct { cronWorkflowName string kubeClient kubernetes.Interface resourceQuota *corev1.ResourceQuota + storageQuota *corev1.ResourceQuota configMap *corev1.ConfigMap } @@ -232,7 +233,16 @@ func (w *When) StorageQuota(quota string) *When { if err != nil { w.t.Fatal(err) } - w.resourceQuota = obj + w.storageQuota = obj + return w +} + +func (w *When) DeleteStorageQuota() *When { + err := util.DeleteQuota(w.kubeClient, w.storageQuota) + if err != nil { + w.t.Fatal(err) + } + w.storageQuota = nil return w } diff --git a/test/e2e/functional_test.go b/test/e2e/functional_test.go index 1594036216ba..460479a780de 100644 --- a/test/e2e/functional_test.go +++ b/test/e2e/functional_test.go @@ -4,6 +4,7 @@ package e2e import ( "regexp" + "strings" "testing" "time" @@ -647,9 +648,9 @@ func (s *FunctionalSuite) TestStorageQuotaLimit() { SubmitWorkflow(). WaitForWorkflowToStart(5*time.Second). WaitForWorkflowCondition(func(wf *wfv1.Workflow) bool { - return wfv1.NodePending == wf.Status.Phase + return strings.Contains(wf.Status.Message, "Waiting for a PVC to be created") }, "PVC pending", 10*time.Second). - DeleteQuota(). + DeleteStorageQuota(). WaitForWorkflow(30 * time.Second). Then(). ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) { diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index bfc04b759567..0c0b2e196066 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -284,11 +284,11 @@ func (woc *wfOperationCtx) operate() { if err != nil { if apierr.IsForbidden(err) { // Error was most likely caused by a lack of resources. - // In this case, Workflow will be in pending state and requeue. - woc.markWorkflowPhase(wfv1.NodePending, false) - woc.requeue(0) + // In this case, Workflow will be requeued. + woc.wf.Status.Message = fmt.Sprintf("Waiting for a PVC to be created. %v", err) + woc.updated = true + woc.requeue(10) return - } msg := "pvc create error" woc.log.WithError(err).Error(msg) @@ -1238,23 +1238,16 @@ func (woc *wfOperationCtx) createPVCs() error { // (e.g. passed validation, or didn't already complete) return nil } - existPVC := make(map[string]bool) - for _, pvc := range woc.wf.Status.PersistentVolumeClaims { - if pvc.Name != "" { - existPVC[pvc.PersistentVolumeClaim.ClaimName] = true - } - } - if len(woc.wf.Status.PersistentVolumeClaims) == 0 { - woc.wf.Status.PersistentVolumeClaims = make([]apiv1.Volume, len(woc.wfSpec.VolumeClaimTemplates)) + if len(woc.wfSpec.VolumeClaimTemplates) == len(woc.wf.Status.PersistentVolumeClaims) { + // If we have already created the PVCs, then there is nothing to do. + // This will also handle the case where workflow has no volumeClaimTemplates. + return nil } pvcClient := woc.controller.kubeclientset.CoreV1().PersistentVolumeClaims(woc.wf.ObjectMeta.Namespace) for i, pvcTmpl := range woc.wfSpec.VolumeClaimTemplates { if pvcTmpl.ObjectMeta.Name == "" { return errors.Errorf(errors.CodeBadRequest, "volumeClaimTemplates[%d].metadata.name is required", i) } - if found := existPVC[pvcTmpl.ObjectMeta.Name]; found { - continue - } pvcTmpl = *pvcTmpl.DeepCopy() // PVC name will be - refName := pvcTmpl.ObjectMeta.Name @@ -1301,7 +1294,11 @@ func (woc *wfOperationCtx) createPVCs() error { }, }, } - woc.wf.Status.PersistentVolumeClaims[i] = vol + woc.wf.Status.PersistentVolumeClaims = append(woc.wf.Status.PersistentVolumeClaims, vol) + // Clearing previous PVC forbidden error + if woc.wf.Status.Message != "" && strings.Contains(woc.wf.Status.Message, "Waiting for a PVC to be created") { + woc.wf.Status.Message = "" + } woc.updated = true } return nil From 31a60f351140e484fa1fedc61e1f852efd6f3df2 Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Mon, 20 Jul 2020 08:21:32 -0700 Subject: [PATCH 06/11] Update operator.go --- workflow/controller/operator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index dcb4ea32e2f3..925be0f0d6fb 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -287,7 +287,7 @@ func (woc *wfOperationCtx) operate() { // In this case, Workflow will be requeued. woc.wf.Status.Message = fmt.Sprintf("Waiting for a PVC to be created. %v", err) woc.updated = true - woc.requeue(10) + woc.requeue(10 * time.Second) return } msg := "pvc create error" From fc00a5b093dc7e6ca0e047d45b14d4e1cec76ee5 Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Tue, 21 Jul 2020 08:49:42 -0700 Subject: [PATCH 07/11] refactored --- workflow/controller/operator.go | 16 +++++++--------- workflow/controller/operator_test.go | 2 +- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 7e13021375d7..eb4339d81724 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -284,10 +284,9 @@ func (woc *wfOperationCtx) operate() { if err != nil { if apierr.IsForbidden(err) { // Error was most likely caused by a lack of resources. - // In this case, Workflow will be requeued. - woc.wf.Status.Message = fmt.Sprintf("Waiting for a PVC to be created. %v", err) - woc.updated = true - woc.requeue(10 * time.Second) + // In this case, Workflow will be in pending state and requeue. + woc.markWorkflowPhase(wfv1.NodePending, false, fmt.Sprintf("Waiting for a PVC to be created. %v", err)) + woc.requeue(10) return } msg := "pvc create error" @@ -295,6 +294,9 @@ func (woc *wfOperationCtx) operate() { woc.markWorkflowError(err, true) woc.eventRecorder.Event(woc.wf, apiv1.EventTypeWarning, "WorkflowFailed", fmt.Sprintf("%s %s: %+v", woc.wf.ObjectMeta.Name, msg, err)) return + } else if woc.wf.Status.Phase == wfv1.NodePending { + // Workflow might be in pending state if previous PVC creation is forbidden + woc.markWorkflowRunning() } node, err := woc.executeTemplate(woc.wf.ObjectMeta.Name, execTmplRef, tmplCtx, execArgs, &executeTemplateOpts{}) @@ -1296,10 +1298,6 @@ func (woc *wfOperationCtx) createPVCs() error { }, } woc.wf.Status.PersistentVolumeClaims = append(woc.wf.Status.PersistentVolumeClaims, vol) - // Clearing previous PVC forbidden error - if woc.wf.Status.Message != "" && strings.Contains(woc.wf.Status.Message, "Waiting for a PVC to be created") { - woc.wf.Status.Message = "" - } woc.updated = true } return nil @@ -1682,7 +1680,7 @@ func (woc *wfOperationCtx) hasDaemonNodes() bool { } func (woc *wfOperationCtx) markWorkflowRunning() { - woc.markWorkflowPhase(wfv1.NodeRunning, false) + woc.markWorkflowPhase(wfv1.NodeRunning, false, "") } func (woc *wfOperationCtx) markWorkflowSuccess() { diff --git a/workflow/controller/operator_test.go b/workflow/controller/operator_test.go index 95e79813df86..fa49d45d1020 100644 --- a/workflow/controller/operator_test.go +++ b/workflow/controller/operator_test.go @@ -4101,7 +4101,7 @@ status: defer cancel() woc := newWorkflowOperationCtx(wf, controller) woc.operate() - assert.Equal(t, wfv1.NodePending, woc.wf.Status.Phase) + assert.Equal(t, wfv1.NodeRunning, woc.wf.Status.Phase) for _, node := range woc.wf.Status.Nodes { switch node.TemplateName { case "main": From c81fbaa34e61b8df3db522b7ce4866d65f1e57e2 Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Tue, 21 Jul 2020 14:07:36 -0700 Subject: [PATCH 08/11] Update util.go --- workflow/util/util.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/workflow/util/util.go b/workflow/util/util.go index 793bdcc36674..a092431b1bd0 100644 --- a/workflow/util/util.go +++ b/workflow/util/util.go @@ -566,7 +566,7 @@ func FormulateResubmitWorkflow(wf *wfv1.Workflow, memoized bool) (*wfv1.Workflow } newWF.Status.Conditions = wfv1.Conditions{{Status: metav1.ConditionFalse, Type: wfv1.ConditionTypeCompleted}} - newWF.Status.Phase = wfv1.NodePending + newWF.Status.Phase = "" return &newWF, nil } From c9108d359399ab62f00ccd1a603d71ff563ab700 Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Tue, 21 Jul 2020 15:56:21 -0700 Subject: [PATCH 09/11] Update storage-limit.yaml --- test/e2e/testdata/storage-limit.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/test/e2e/testdata/storage-limit.yaml b/test/e2e/testdata/storage-limit.yaml index 7e5b679fed47..ffa55b358a61 100644 --- a/test/e2e/testdata/storage-limit.yaml +++ b/test/e2e/testdata/storage-limit.yaml @@ -5,7 +5,6 @@ metadata: labels: argo-e2e: true spec: - serviceAccountName: argo entrypoint: wait volumeClaimTemplates: # define volume, same syntax as k8s Pod spec - metadata: From 812d9ed3a687e68f3ad638e38551a029db963253 Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Tue, 21 Jul 2020 22:18:43 -0700 Subject: [PATCH 10/11] Update storage-limit.yaml --- test/e2e/testdata/storage-limit.yaml | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/test/e2e/testdata/storage-limit.yaml b/test/e2e/testdata/storage-limit.yaml index ffa55b358a61..44849069556a 100644 --- a/test/e2e/testdata/storage-limit.yaml +++ b/test/e2e/testdata/storage-limit.yaml @@ -18,6 +18,5 @@ spec: templates: - name: wait script: - image: alpine:latest - command: [sh, -c] - args: ["echo", "10s"] + image: argoproj/argosay:v2 + args: ["sleep", "2s"] From b271d726de5678c5ed07d6ce1e200fe83a89ef51 Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Wed, 22 Jul 2020 07:55:45 -0700 Subject: [PATCH 11/11] Update storage-limit.yaml --- test/e2e/testdata/storage-limit.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/e2e/testdata/storage-limit.yaml b/test/e2e/testdata/storage-limit.yaml index 44849069556a..51b1e79644ad 100644 --- a/test/e2e/testdata/storage-limit.yaml +++ b/test/e2e/testdata/storage-limit.yaml @@ -19,4 +19,4 @@ spec: - name: wait script: image: argoproj/argosay:v2 - args: ["sleep", "2s"] + args: [echo, ":) Hello Argo!"]