Skip to content

Commit

Permalink
feat(executor): Wait for termination using pod watch for PNS and K8SA…
Browse files Browse the repository at this point in the history
…PI executors. (#4253)
  • Loading branch information
alexec authored Oct 22, 2020
1 parent 3156559 commit ec671dd
Show file tree
Hide file tree
Showing 13 changed files with 150 additions and 41 deletions.
58 changes: 45 additions & 13 deletions .github/workflows/ci-build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,50 @@ on:

jobs:
tests:
name: Tests
name: Unit Tests
runs-on: ubuntu-latest
timeout-minutes: 10
steps:
- name: Checkout code
uses: actions/checkout@v2
- name: Restore go build cache
uses: actions/cache@v1
with:
path: ~/.cache/go-build
key: ${{ runner.os }}-go-build-v1-${{ hashFiles('**/go.mod') }}
- name: Setup Golang
uses: actions/setup-go@v1
with:
go-version: "1.13.12"
- name: Add bins to PATH
run: |
echo /home/runner/go/bin >> $GITHUB_PATH
echo /usr/local/bin >> $GITHUB_PATH
- name: Run tests
env:
GOPATH: /home/runner/go
run: make test

e2e-tests:
name: E2E Tests
runs-on: ubuntu-latest
timeout-minutes: 20
strategy:
fail-fast: false
matrix:
test: ["test", "smoke", "test-e2e", "test-e2e-cron"]
# kubelet is not included because it'd take ages to get it working methinks
test: [ "smoke", "test-e2e", "test-e2e-cron" ]
containerRuntimeExecutor: [ "docker", "k8sapi", "pns" ]
# ok, so we're only running `smoke` for all CREs,
exclude:
- test: test-e2e
containerRuntimeExecutor: k8sapi
- test: test-e2e
containerRuntimeExecutor: pns
- test: test-e2e-cron
containerRuntimeExecutor: k8sapi
- test: test-e2e-cron
containerRuntimeExecutor: pns
steps:
- name: Checkout code
uses: actions/checkout@v2
Expand All @@ -32,18 +69,15 @@ jobs:
go-version: "1.13.12"
- name: Add bins to PATH
run: |
echo "::add-path::/home/runner/go/bin"
echo "::add-path::/usr/local/bin"
echo /home/runner/go/bin >> $GITHUB_PATH
echo /usr/local/bin >> $GITHUB_PATH
- name: Install and start K3S v1.18.8+k3s1
if: ${{ matrix.test != 'test' }}
run: curl -sfL https://get.k3s.io | INSTALL_K3S_VERSION=v1.18.8+k3s1 INSTALL_K3S_CHANNEL=stable INSTALL_K3S_EXEC=--docker K3S_KUBECONFIG_MODE=644 sh - &
- name: Pre-pull images
if: ${{ matrix.test != 'test' }}
env:
GOPATH: /home/runner/go
run: make pull-build-images test-images &
- name: Create Kubeconfig
if: ${{ matrix.test != 'test' }}
run: |
mkdir -p ~/.kube
until stat /etc/rancher/k3s/k3s.yaml ; do sleep 10s ; done
Expand All @@ -52,7 +86,6 @@ jobs:
echo " user:" >> ~/.kube/config
echo " token: xxxxxx" >> ~/.kube/config
- name: Start Argo
if: ${{ matrix.test != 'test' }}
env:
GOPATH: /home/runner/go
PROFILE: mysql
Expand All @@ -63,9 +96,8 @@ jobs:
echo '127.0.0.1 mysql' | sudo tee -a /etc/hosts
mkdir -p /tmp/log/argo-e2e
git fetch --tags
KUBECONFIG=~/.kube/config make start PROFILE=$PROFILE E2E_EXECUTOR=docker DEV_IMAGE=true STATIC_FILES=false 2>&1 > /tmp/log/argo-e2e/argo.log &
KUBECONFIG=~/.kube/config make start PROFILE=$PROFILE E2E_EXECUTOR=${{matrix.containerRuntimeExecutor}} DEV_IMAGE=true STATIC_FILES=false 2>&1 > /tmp/log/argo-e2e/argo.log &
- name: Wait for Argo Server to be ready
if: ${{ matrix.test != 'test' }}
env:
GOPATH: /home/runner/go
run: make wait
Expand All @@ -74,7 +106,7 @@ jobs:
GOPATH: /home/runner/go
run: make ${{ matrix.test }}
- name: Upload logs
if: ${{ always() && matrix.test != 'test' }}
if: ${{ always() }}
uses: actions/upload-artifact@v1
with:
name: ${{ matrix.test }}-${{ github.run_id }}-argo.log
Expand All @@ -98,8 +130,8 @@ jobs:
go-version: "1.13.12"
- name: Add bins to PATH
run: |
echo "::add-path::/home/runner/go/bin"
echo "::add-path::/usr/local/bin"
echo /home/runner/go/bin >> $GITHUB_PATH
echo /usr/local/bin >> $GITHUB_PATH
- name: Install protoc
run: |
set -eux -o pipefail
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ mysql-cli:

.PHONY: test-e2e
test-e2e:
go test -timeout 15m -count 1 --tags e2e -p 1 --short ./test/e2e
go test -timeout 20m -count 1 --tags e2e -p 1 --short ./test/e2e

.PHONY: test-e2e-cron
test-e2e-cron:
Expand Down
2 changes: 1 addition & 1 deletion docs/empty-dir.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Empty Dir

While by default, the Docker and PNS [workflow executors](workflow-executors.md) can get output artifacts from the base layer (e.g. `/tmp`), neither the Kubelet or K8SAPI exectuors can. Nor are you likely to be able to get output artifacts from the base layer if you run your workflo pods a [security context](workflow-pod-security-context.md).
While by default, the Docker and PNS [workflow executors](workflow-executors.md) can get output artifacts from the base layer (e.g. `/tmp`), neither the Kubelet or the K8SAPI executors can. It is unlikely you can get output artifacts from the base layer if you run your workflow pods with a [security context](workflow-pod-security-context.md).

You can work-around this constraint by mounting volumes onto your pod. The easiest way to do this is to use as `emptytDir` volume.

Expand Down
8 changes: 6 additions & 2 deletions test/e2e/fixtures/e2e_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const defaultTimeout = 30 * time.Second

type E2ESuite struct {
suite.Suite
Config config.Config
Config *config.Config
Persistence *Persistence
RestConfig *rest.Config
wfClient v1alpha1.WorkflowInterface
Expand All @@ -53,11 +53,15 @@ func (s *E2ESuite) SetupSuite() {
s.CheckError(err)
s.KubeClient, err = kubernetes.NewForConfig(s.RestConfig)
s.CheckError(err)
configController := config.NewController(Namespace, "workflow-controller-configmap", s.KubeClient, config.EmptyConfigFunc)
c, err := configController.Get()
s.CheckError(err)
s.Config = c.(*config.Config)
s.wfClient = versioned.NewForConfigOrDie(s.RestConfig).ArgoprojV1alpha1().Workflows(Namespace)
s.wfebClient = versioned.NewForConfigOrDie(s.RestConfig).ArgoprojV1alpha1().WorkflowEventBindings(Namespace)
s.wfTemplateClient = versioned.NewForConfigOrDie(s.RestConfig).ArgoprojV1alpha1().WorkflowTemplates(Namespace)
s.cronClient = versioned.NewForConfigOrDie(s.RestConfig).ArgoprojV1alpha1().CronWorkflows(Namespace)
s.Persistence = newPersistence(s.KubeClient)
s.Persistence = newPersistence(s.KubeClient, s.Config)
s.hydrator = hydrator.New(s.Persistence.offloadNodeStatusRepo)
s.cwfTemplateClient = versioned.NewForConfigOrDie(s.RestConfig).ArgoprojV1alpha1().ClusterWorkflowTemplates()
}
Expand Down
8 changes: 1 addition & 7 deletions test/e2e/fixtures/persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,7 @@ type Persistence struct {
workflowArchive sqldb.WorkflowArchive
}

func newPersistence(kubeClient kubernetes.Interface) *Persistence {
configController := config.NewController(Namespace, "workflow-controller-configmap", kubeClient, config.EmptyConfigFunc)
v, err := configController.Get()
if err != nil {
panic(err)
}
wcConfig := v.(*config.Config)
func newPersistence(kubeClient kubernetes.Interface, wcConfig *config.Config) *Persistence {
persistence := wcConfig.Persistence
if persistence != nil {
if persistence.PostgreSQL != nil {
Expand Down
10 changes: 7 additions & 3 deletions test/e2e/images/argosay/v2/main/argosay.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,13 @@ func echo(args []string) error {
return nil
case 2:
file := args[1]
err := os.MkdirAll(filepath.Dir(file), 0777)
if err != nil {
return err
dir := filepath.Dir(file)
_, err := os.Stat(dir)
if os.IsNotExist(err) {
err := os.MkdirAll(dir, 0777)
if err != nil {
return err
}
}
err = ioutil.WriteFile(file, []byte(args[0]), 0666)
if err != nil {
Expand Down
15 changes: 15 additions & 0 deletions test/e2e/smoke/runasnonroot-workflow.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: runasnonroot-
labels:
argo-e2e: true
spec:
entrypoint: main
securityContext:
runAsNonRoot: true
runAsUser: 8737
templates:
- name: main
container:
image: argoproj/argosay:v2
19 changes: 19 additions & 0 deletions test/e2e/smoke_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/test/e2e/fixtures"
"github.com/argoproj/argo/workflow/common"
)

type SmokeSuite struct {
Expand All @@ -32,7 +33,25 @@ func (s *SmokeSuite) TestBasicWorkflow() {
})
}

func (s *SmokeSuite) TestRunAsNonRootWorkflow() {
if s.Config.ContainerRuntimeExecutor == common.ContainerRuntimeExecutorDocker {
s.T().Skip("docker not supported")
}
s.Given().
Workflow("@smoke/runasnonroot-workflow.yaml").
When().
SubmitWorkflow().
WaitForWorkflow().
Then().
ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.NodeSucceeded, status.Phase)
})
}

func (s *SmokeSuite) TestArtifactPassing() {
if s.Config.ContainerRuntimeExecutor != common.ContainerRuntimeExecutorDocker {
s.T().Skip("non-docker not supported")
}
s.Given().
Workflow("@smoke/artifact-passing.yaml").
When().
Expand Down
47 changes: 47 additions & 0 deletions workflow/executor/common/wait/wait.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package wait

import (
"fmt"

log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"

"github.com/argoproj/argo/workflow/executor/common"
)

func UntilTerminated(kubernetesInterface kubernetes.Interface, namespace, podName, containerID string) error {
log.Infof("Waiting for container %s to be terminated", containerID)
podInterface := kubernetesInterface.CoreV1().Pods(namespace)
listOptions := metav1.ListOptions{FieldSelector: "metadata.name=" + podName}
for {
done, err := untilTerminatedAux(podInterface, containerID, listOptions)
if done {
return err
}
}
}

func untilTerminatedAux(podInterface v1.PodInterface, containerID string, listOptions metav1.ListOptions) (bool, error) {
w, err := podInterface.Watch(listOptions)
if err != nil {
return true, fmt.Errorf("could not watch pod: %w", err)
}
defer w.Stop()
for event := range w.ResultChan() {
pod, ok := event.Object.(*corev1.Pod)
if !ok {
return false, apierrors.FromObject(event.Object)
}
for _, s := range pod.Status.ContainerStatuses {
if common.GetContainerID(&s) == containerID && s.State.Terminated != nil {
return true, nil
}
}
listOptions.ResourceVersion = pod.ResourceVersion
}
return true, nil
}
5 changes: 0 additions & 5 deletions workflow/executor/k8sapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"io"
"syscall"
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -79,10 +78,6 @@ func (c *k8sAPIClient) GetContainerStatus(containerID string) (*corev1.Pod, *cor
return nil, nil, errors.New(errors.CodeNotFound, fmt.Sprintf("containerID %q is not found in the pod %s", containerID, c.podName))
}

func (c *k8sAPIClient) waitForTermination(containerID string, timeout time.Duration) error {
return execcommon.WaitForTermination(c, containerID, timeout)
}

func (c *k8sAPIClient) KillContainer(pod *corev1.Pod, container *corev1.ContainerStatus, sig syscall.Signal) error {
command := []string{"/bin/sh", "-c", fmt.Sprintf("kill -%d 1", sig)}
exec, err := common.ExecPodContainer(c.config, c.namespace, c.podName, container.Name, false, true, command...)
Expand Down
4 changes: 2 additions & 2 deletions workflow/executor/k8sapi/k8sapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
restclient "k8s.io/client-go/rest"

"github.com/argoproj/argo/errors"
"github.com/argoproj/argo/workflow/executor/common/wait"
)

type K8sAPIExecutor struct {
Expand Down Expand Up @@ -60,8 +61,7 @@ func (k *K8sAPIExecutor) WaitInit() error {

// Wait for the container to complete
func (k *K8sAPIExecutor) Wait(containerID string) error {
log.Infof("Waiting for container %s to complete", containerID)
return k.client.waitForTermination(containerID, 0)
return wait.UntilTerminated(k.client.clientset, k.client.namespace, k.client.podName, containerID)
}

// Kill kills a list of containerIDs first with a SIGTERM then with a SIGKILL after a grace period
Expand Down
9 changes: 4 additions & 5 deletions workflow/executor/pns/pns.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/argoproj/argo/util/archive"
"github.com/argoproj/argo/workflow/common"
execcommon "github.com/argoproj/argo/workflow/executor/common"
"github.com/argoproj/argo/workflow/executor/common/wait"
os_specific "github.com/argoproj/argo/workflow/executor/os-specific"
)

Expand All @@ -40,8 +41,6 @@ type PNSExecutor struct {

// thisPID is the pid of this process
thisPID int
// mainPID holds the main container's pid
mainPID int
// mainFS holds a file descriptor to the main filesystem, allowing the executor to access the
// filesystem after the main process exited
mainFS *os.File
Expand Down Expand Up @@ -163,16 +162,16 @@ func (p *PNSExecutor) WaitInit() error {
func (p *PNSExecutor) Wait(containerID string) error {
mainPID, err := p.getContainerPID(containerID)
if err != nil {
log.Warnf("Failed to get main PID: %v", err)
if !p.hasOutputs {
log.Warnf("Ignoring wait failure: %v. Process assumed to have completed", err)
return nil
}
return err
return wait.UntilTerminated(p.clientset, p.namespace, p.podName, containerID)
}
log.Infof("Main pid identified as %d", mainPID)
p.mainPID = mainPID
for pid, f := range p.pidFileHandles {
if pid == p.mainPID {
if pid == mainPID {
log.Info("Successfully secured file handle on main container root filesystem")
p.mainFS = &f.file
} else {
Expand Down
4 changes: 2 additions & 2 deletions workflow/validate/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -1022,7 +1022,7 @@ func (ctx *templateValidationCtx) validateBaseImageOutputs(tmpl *wfv1.Template)
// docker executor supports all modes of artifact outputs
case common.ContainerRuntimeExecutorPNS:
// pns supports copying from the base image, but only if there is no volume mount underneath it
errMsg := "pns executor does not support outputs from base image layer with volume mounts. must use emptyDir"
errMsg := "pns executor does not support outputs from base image layer with volume mounts. Use an emptyDir: https://argoproj.github.io/argo/empty-dir/"
for _, out := range tmpl.Outputs.Artifacts {
if common.FindOverlappingVolume(tmpl, out.Path) == nil {
// output is in the base image layer. need to verify there are no volume mounts under it
Expand All @@ -1045,7 +1045,7 @@ func (ctx *templateValidationCtx) validateBaseImageOutputs(tmpl *wfv1.Template)
}
case common.ContainerRuntimeExecutorK8sAPI, common.ContainerRuntimeExecutorKubelet:
// for kubelet/k8s fail validation if we detect artifact is copied from base image layer
errMsg := fmt.Sprintf("%s executor does not support outputs from base image layer. must use emptyDir", ctx.ContainerRuntimeExecutor)
errMsg := fmt.Sprintf("%s executor does not support outputs from base image layer. Use an emptyDir: https://argoproj.github.io/argo/empty-dir/", ctx.ContainerRuntimeExecutor)
for _, out := range tmpl.Outputs.Artifacts {
if common.FindOverlappingVolume(tmpl, out.Path) == nil {
return errors.Errorf(errors.CodeBadRequest, "templates.%s.outputs.artifacts.%s: %s", tmpl.Name, out.Name, errMsg)
Expand Down

0 comments on commit ec671dd

Please sign in to comment.