From ec671ddceb1c8d18fa0410e22106659a1572683c Mon Sep 17 00:00:00 2001 From: Alex Collins Date: Thu, 22 Oct 2020 08:16:35 -0700 Subject: [PATCH] feat(executor): Wait for termination using pod watch for PNS and K8SAPI executors. (#4253) --- .github/workflows/ci-build.yaml | 58 +++++++++++++++++----- Makefile | 2 +- docs/empty-dir.md | 2 +- test/e2e/fixtures/e2e_suite.go | 8 ++- test/e2e/fixtures/persistence.go | 8 +-- test/e2e/images/argosay/v2/main/argosay.go | 10 ++-- test/e2e/smoke/runasnonroot-workflow.yaml | 15 ++++++ test/e2e/smoke_test.go | 19 +++++++ workflow/executor/common/wait/wait.go | 47 ++++++++++++++++++ workflow/executor/k8sapi/client.go | 5 -- workflow/executor/k8sapi/k8sapi.go | 4 +- workflow/executor/pns/pns.go | 9 ++-- workflow/validate/validate.go | 4 +- 13 files changed, 150 insertions(+), 41 deletions(-) create mode 100644 test/e2e/smoke/runasnonroot-workflow.yaml create mode 100644 workflow/executor/common/wait/wait.go diff --git a/.github/workflows/ci-build.yaml b/.github/workflows/ci-build.yaml index 594df61bc476..ad7b652307c9 100644 --- a/.github/workflows/ci-build.yaml +++ b/.github/workflows/ci-build.yaml @@ -11,13 +11,50 @@ on: jobs: tests: - name: Tests + name: Unit Tests + runs-on: ubuntu-latest + timeout-minutes: 10 + steps: + - name: Checkout code + uses: actions/checkout@v2 + - name: Restore go build cache + uses: actions/cache@v1 + with: + path: ~/.cache/go-build + key: ${{ runner.os }}-go-build-v1-${{ hashFiles('**/go.mod') }} + - name: Setup Golang + uses: actions/setup-go@v1 + with: + go-version: "1.13.12" + - name: Add bins to PATH + run: | + echo /home/runner/go/bin >> $GITHUB_PATH + echo /usr/local/bin >> $GITHUB_PATH + - name: Run tests + env: + GOPATH: /home/runner/go + run: make test + + e2e-tests: + name: E2E Tests runs-on: ubuntu-latest timeout-minutes: 20 strategy: fail-fast: false matrix: - test: ["test", "smoke", "test-e2e", "test-e2e-cron"] + # kubelet is not included because it'd take ages to get it working methinks + test: [ "smoke", "test-e2e", "test-e2e-cron" ] + containerRuntimeExecutor: [ "docker", "k8sapi", "pns" ] + # ok, so we're only running `smoke` for all CREs, + exclude: + - test: test-e2e + containerRuntimeExecutor: k8sapi + - test: test-e2e + containerRuntimeExecutor: pns + - test: test-e2e-cron + containerRuntimeExecutor: k8sapi + - test: test-e2e-cron + containerRuntimeExecutor: pns steps: - name: Checkout code uses: actions/checkout@v2 @@ -32,18 +69,15 @@ jobs: go-version: "1.13.12" - name: Add bins to PATH run: | - echo "::add-path::/home/runner/go/bin" - echo "::add-path::/usr/local/bin" + echo /home/runner/go/bin >> $GITHUB_PATH + echo /usr/local/bin >> $GITHUB_PATH - name: Install and start K3S v1.18.8+k3s1 - if: ${{ matrix.test != 'test' }} run: curl -sfL https://get.k3s.io | INSTALL_K3S_VERSION=v1.18.8+k3s1 INSTALL_K3S_CHANNEL=stable INSTALL_K3S_EXEC=--docker K3S_KUBECONFIG_MODE=644 sh - & - name: Pre-pull images - if: ${{ matrix.test != 'test' }} env: GOPATH: /home/runner/go run: make pull-build-images test-images & - name: Create Kubeconfig - if: ${{ matrix.test != 'test' }} run: | mkdir -p ~/.kube until stat /etc/rancher/k3s/k3s.yaml ; do sleep 10s ; done @@ -52,7 +86,6 @@ jobs: echo " user:" >> ~/.kube/config echo " token: xxxxxx" >> ~/.kube/config - name: Start Argo - if: ${{ matrix.test != 'test' }} env: GOPATH: /home/runner/go PROFILE: mysql @@ -63,9 +96,8 @@ jobs: echo '127.0.0.1 mysql' | sudo tee -a /etc/hosts mkdir -p /tmp/log/argo-e2e git fetch --tags - KUBECONFIG=~/.kube/config make start PROFILE=$PROFILE E2E_EXECUTOR=docker DEV_IMAGE=true STATIC_FILES=false 2>&1 > /tmp/log/argo-e2e/argo.log & + KUBECONFIG=~/.kube/config make start PROFILE=$PROFILE E2E_EXECUTOR=${{matrix.containerRuntimeExecutor}} DEV_IMAGE=true STATIC_FILES=false 2>&1 > /tmp/log/argo-e2e/argo.log & - name: Wait for Argo Server to be ready - if: ${{ matrix.test != 'test' }} env: GOPATH: /home/runner/go run: make wait @@ -74,7 +106,7 @@ jobs: GOPATH: /home/runner/go run: make ${{ matrix.test }} - name: Upload logs - if: ${{ always() && matrix.test != 'test' }} + if: ${{ always() }} uses: actions/upload-artifact@v1 with: name: ${{ matrix.test }}-${{ github.run_id }}-argo.log @@ -98,8 +130,8 @@ jobs: go-version: "1.13.12" - name: Add bins to PATH run: | - echo "::add-path::/home/runner/go/bin" - echo "::add-path::/usr/local/bin" + echo /home/runner/go/bin >> $GITHUB_PATH + echo /usr/local/bin >> $GITHUB_PATH - name: Install protoc run: | set -eux -o pipefail diff --git a/Makefile b/Makefile index 4387ba60a538..cb243d05a2dd 100644 --- a/Makefile +++ b/Makefile @@ -461,7 +461,7 @@ mysql-cli: .PHONY: test-e2e test-e2e: - go test -timeout 15m -count 1 --tags e2e -p 1 --short ./test/e2e + go test -timeout 20m -count 1 --tags e2e -p 1 --short ./test/e2e .PHONY: test-e2e-cron test-e2e-cron: diff --git a/docs/empty-dir.md b/docs/empty-dir.md index c24613719736..e11b1f5e5238 100644 --- a/docs/empty-dir.md +++ b/docs/empty-dir.md @@ -1,6 +1,6 @@ # Empty Dir -While by default, the Docker and PNS [workflow executors](workflow-executors.md) can get output artifacts from the base layer (e.g. `/tmp`), neither the Kubelet or K8SAPI exectuors can. Nor are you likely to be able to get output artifacts from the base layer if you run your workflo pods a [security context](workflow-pod-security-context.md). +While by default, the Docker and PNS [workflow executors](workflow-executors.md) can get output artifacts from the base layer (e.g. `/tmp`), neither the Kubelet or the K8SAPI executors can. It is unlikely you can get output artifacts from the base layer if you run your workflow pods with a [security context](workflow-pod-security-context.md). You can work-around this constraint by mounting volumes onto your pod. The easiest way to do this is to use as `emptytDir` volume. diff --git a/test/e2e/fixtures/e2e_suite.go b/test/e2e/fixtures/e2e_suite.go index e6bf59870cfe..75c6e8f1df60 100644 --- a/test/e2e/fixtures/e2e_suite.go +++ b/test/e2e/fixtures/e2e_suite.go @@ -35,7 +35,7 @@ const defaultTimeout = 30 * time.Second type E2ESuite struct { suite.Suite - Config config.Config + Config *config.Config Persistence *Persistence RestConfig *rest.Config wfClient v1alpha1.WorkflowInterface @@ -53,11 +53,15 @@ func (s *E2ESuite) SetupSuite() { s.CheckError(err) s.KubeClient, err = kubernetes.NewForConfig(s.RestConfig) s.CheckError(err) + configController := config.NewController(Namespace, "workflow-controller-configmap", s.KubeClient, config.EmptyConfigFunc) + c, err := configController.Get() + s.CheckError(err) + s.Config = c.(*config.Config) s.wfClient = versioned.NewForConfigOrDie(s.RestConfig).ArgoprojV1alpha1().Workflows(Namespace) s.wfebClient = versioned.NewForConfigOrDie(s.RestConfig).ArgoprojV1alpha1().WorkflowEventBindings(Namespace) s.wfTemplateClient = versioned.NewForConfigOrDie(s.RestConfig).ArgoprojV1alpha1().WorkflowTemplates(Namespace) s.cronClient = versioned.NewForConfigOrDie(s.RestConfig).ArgoprojV1alpha1().CronWorkflows(Namespace) - s.Persistence = newPersistence(s.KubeClient) + s.Persistence = newPersistence(s.KubeClient, s.Config) s.hydrator = hydrator.New(s.Persistence.offloadNodeStatusRepo) s.cwfTemplateClient = versioned.NewForConfigOrDie(s.RestConfig).ArgoprojV1alpha1().ClusterWorkflowTemplates() } diff --git a/test/e2e/fixtures/persistence.go b/test/e2e/fixtures/persistence.go index 8a0d03a2ba3d..f8f788c71587 100644 --- a/test/e2e/fixtures/persistence.go +++ b/test/e2e/fixtures/persistence.go @@ -15,13 +15,7 @@ type Persistence struct { workflowArchive sqldb.WorkflowArchive } -func newPersistence(kubeClient kubernetes.Interface) *Persistence { - configController := config.NewController(Namespace, "workflow-controller-configmap", kubeClient, config.EmptyConfigFunc) - v, err := configController.Get() - if err != nil { - panic(err) - } - wcConfig := v.(*config.Config) +func newPersistence(kubeClient kubernetes.Interface, wcConfig *config.Config) *Persistence { persistence := wcConfig.Persistence if persistence != nil { if persistence.PostgreSQL != nil { diff --git a/test/e2e/images/argosay/v2/main/argosay.go b/test/e2e/images/argosay/v2/main/argosay.go index 6a9791b1e90f..06ee84ba9fcd 100644 --- a/test/e2e/images/argosay/v2/main/argosay.go +++ b/test/e2e/images/argosay/v2/main/argosay.go @@ -81,9 +81,13 @@ func echo(args []string) error { return nil case 2: file := args[1] - err := os.MkdirAll(filepath.Dir(file), 0777) - if err != nil { - return err + dir := filepath.Dir(file) + _, err := os.Stat(dir) + if os.IsNotExist(err) { + err := os.MkdirAll(dir, 0777) + if err != nil { + return err + } } err = ioutil.WriteFile(file, []byte(args[0]), 0666) if err != nil { diff --git a/test/e2e/smoke/runasnonroot-workflow.yaml b/test/e2e/smoke/runasnonroot-workflow.yaml new file mode 100644 index 000000000000..7bac579ad3f8 --- /dev/null +++ b/test/e2e/smoke/runasnonroot-workflow.yaml @@ -0,0 +1,15 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: runasnonroot- + labels: + argo-e2e: true +spec: + entrypoint: main + securityContext: + runAsNonRoot: true + runAsUser: 8737 + templates: + - name: main + container: + image: argoproj/argosay:v2 \ No newline at end of file diff --git a/test/e2e/smoke_test.go b/test/e2e/smoke_test.go index e583f7a9f2f2..925fa20ee387 100644 --- a/test/e2e/smoke_test.go +++ b/test/e2e/smoke_test.go @@ -12,6 +12,7 @@ import ( wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo/test/e2e/fixtures" + "github.com/argoproj/argo/workflow/common" ) type SmokeSuite struct { @@ -32,7 +33,25 @@ func (s *SmokeSuite) TestBasicWorkflow() { }) } +func (s *SmokeSuite) TestRunAsNonRootWorkflow() { + if s.Config.ContainerRuntimeExecutor == common.ContainerRuntimeExecutorDocker { + s.T().Skip("docker not supported") + } + s.Given(). + Workflow("@smoke/runasnonroot-workflow.yaml"). + When(). + SubmitWorkflow(). + WaitForWorkflow(). + Then(). + ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) { + assert.Equal(t, wfv1.NodeSucceeded, status.Phase) + }) +} + func (s *SmokeSuite) TestArtifactPassing() { + if s.Config.ContainerRuntimeExecutor != common.ContainerRuntimeExecutorDocker { + s.T().Skip("non-docker not supported") + } s.Given(). Workflow("@smoke/artifact-passing.yaml"). When(). diff --git a/workflow/executor/common/wait/wait.go b/workflow/executor/common/wait/wait.go new file mode 100644 index 000000000000..0b5910c5b751 --- /dev/null +++ b/workflow/executor/common/wait/wait.go @@ -0,0 +1,47 @@ +package wait + +import ( + "fmt" + + log "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + v1 "k8s.io/client-go/kubernetes/typed/core/v1" + + "github.com/argoproj/argo/workflow/executor/common" +) + +func UntilTerminated(kubernetesInterface kubernetes.Interface, namespace, podName, containerID string) error { + log.Infof("Waiting for container %s to be terminated", containerID) + podInterface := kubernetesInterface.CoreV1().Pods(namespace) + listOptions := metav1.ListOptions{FieldSelector: "metadata.name=" + podName} + for { + done, err := untilTerminatedAux(podInterface, containerID, listOptions) + if done { + return err + } + } +} + +func untilTerminatedAux(podInterface v1.PodInterface, containerID string, listOptions metav1.ListOptions) (bool, error) { + w, err := podInterface.Watch(listOptions) + if err != nil { + return true, fmt.Errorf("could not watch pod: %w", err) + } + defer w.Stop() + for event := range w.ResultChan() { + pod, ok := event.Object.(*corev1.Pod) + if !ok { + return false, apierrors.FromObject(event.Object) + } + for _, s := range pod.Status.ContainerStatuses { + if common.GetContainerID(&s) == containerID && s.State.Terminated != nil { + return true, nil + } + } + listOptions.ResourceVersion = pod.ResourceVersion + } + return true, nil +} diff --git a/workflow/executor/k8sapi/client.go b/workflow/executor/k8sapi/client.go index c2434a78a99d..204c028c40a2 100644 --- a/workflow/executor/k8sapi/client.go +++ b/workflow/executor/k8sapi/client.go @@ -5,7 +5,6 @@ import ( "fmt" "io" "syscall" - "time" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -79,10 +78,6 @@ func (c *k8sAPIClient) GetContainerStatus(containerID string) (*corev1.Pod, *cor return nil, nil, errors.New(errors.CodeNotFound, fmt.Sprintf("containerID %q is not found in the pod %s", containerID, c.podName)) } -func (c *k8sAPIClient) waitForTermination(containerID string, timeout time.Duration) error { - return execcommon.WaitForTermination(c, containerID, timeout) -} - func (c *k8sAPIClient) KillContainer(pod *corev1.Pod, container *corev1.ContainerStatus, sig syscall.Signal) error { command := []string{"/bin/sh", "-c", fmt.Sprintf("kill -%d 1", sig)} exec, err := common.ExecPodContainer(c.config, c.namespace, c.podName, container.Name, false, true, command...) diff --git a/workflow/executor/k8sapi/k8sapi.go b/workflow/executor/k8sapi/k8sapi.go index 8fe7b3852fc2..3c922c77ccd0 100644 --- a/workflow/executor/k8sapi/k8sapi.go +++ b/workflow/executor/k8sapi/k8sapi.go @@ -9,6 +9,7 @@ import ( restclient "k8s.io/client-go/rest" "github.com/argoproj/argo/errors" + "github.com/argoproj/argo/workflow/executor/common/wait" ) type K8sAPIExecutor struct { @@ -60,8 +61,7 @@ func (k *K8sAPIExecutor) WaitInit() error { // Wait for the container to complete func (k *K8sAPIExecutor) Wait(containerID string) error { - log.Infof("Waiting for container %s to complete", containerID) - return k.client.waitForTermination(containerID, 0) + return wait.UntilTerminated(k.client.clientset, k.client.namespace, k.client.podName, containerID) } // Kill kills a list of containerIDs first with a SIGTERM then with a SIGKILL after a grace period diff --git a/workflow/executor/pns/pns.go b/workflow/executor/pns/pns.go index f3a0f584552a..07c235cd2bf2 100644 --- a/workflow/executor/pns/pns.go +++ b/workflow/executor/pns/pns.go @@ -22,6 +22,7 @@ import ( "github.com/argoproj/argo/util/archive" "github.com/argoproj/argo/workflow/common" execcommon "github.com/argoproj/argo/workflow/executor/common" + "github.com/argoproj/argo/workflow/executor/common/wait" os_specific "github.com/argoproj/argo/workflow/executor/os-specific" ) @@ -40,8 +41,6 @@ type PNSExecutor struct { // thisPID is the pid of this process thisPID int - // mainPID holds the main container's pid - mainPID int // mainFS holds a file descriptor to the main filesystem, allowing the executor to access the // filesystem after the main process exited mainFS *os.File @@ -163,16 +162,16 @@ func (p *PNSExecutor) WaitInit() error { func (p *PNSExecutor) Wait(containerID string) error { mainPID, err := p.getContainerPID(containerID) if err != nil { + log.Warnf("Failed to get main PID: %v", err) if !p.hasOutputs { log.Warnf("Ignoring wait failure: %v. Process assumed to have completed", err) return nil } - return err + return wait.UntilTerminated(p.clientset, p.namespace, p.podName, containerID) } log.Infof("Main pid identified as %d", mainPID) - p.mainPID = mainPID for pid, f := range p.pidFileHandles { - if pid == p.mainPID { + if pid == mainPID { log.Info("Successfully secured file handle on main container root filesystem") p.mainFS = &f.file } else { diff --git a/workflow/validate/validate.go b/workflow/validate/validate.go index b4437060e40e..153bd04277cc 100644 --- a/workflow/validate/validate.go +++ b/workflow/validate/validate.go @@ -1022,7 +1022,7 @@ func (ctx *templateValidationCtx) validateBaseImageOutputs(tmpl *wfv1.Template) // docker executor supports all modes of artifact outputs case common.ContainerRuntimeExecutorPNS: // pns supports copying from the base image, but only if there is no volume mount underneath it - errMsg := "pns executor does not support outputs from base image layer with volume mounts. must use emptyDir" + errMsg := "pns executor does not support outputs from base image layer with volume mounts. Use an emptyDir: https://argoproj.github.io/argo/empty-dir/" for _, out := range tmpl.Outputs.Artifacts { if common.FindOverlappingVolume(tmpl, out.Path) == nil { // output is in the base image layer. need to verify there are no volume mounts under it @@ -1045,7 +1045,7 @@ func (ctx *templateValidationCtx) validateBaseImageOutputs(tmpl *wfv1.Template) } case common.ContainerRuntimeExecutorK8sAPI, common.ContainerRuntimeExecutorKubelet: // for kubelet/k8s fail validation if we detect artifact is copied from base image layer - errMsg := fmt.Sprintf("%s executor does not support outputs from base image layer. must use emptyDir", ctx.ContainerRuntimeExecutor) + errMsg := fmt.Sprintf("%s executor does not support outputs from base image layer. Use an emptyDir: https://argoproj.github.io/argo/empty-dir/", ctx.ContainerRuntimeExecutor) for _, out := range tmpl.Outputs.Artifacts { if common.FindOverlappingVolume(tmpl, out.Path) == nil { return errors.Errorf(errors.CodeBadRequest, "templates.%s.outputs.artifacts.%s: %s", tmpl.Name, out.Name, errMsg)