Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(executor): Wait for termination using pod watch for PNS and K8SAPI executors. #4253

Merged
merged 64 commits into from
Oct 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
a02ca40
pod-watch
alexec Oct 9, 2020
c2fdd9a
feat(executor): Wait for termination using pod watch for PNS and K8SA…
alexec Oct 9, 2020
0714866
pod-watch
alexec Oct 9, 2020
ef1da0e
ok
alexec Oct 9, 2020
ae7f695
ok
alexec Oct 9, 2020
2e19290
pod-watch
alexec Oct 9, 2020
1dada9c
pod-watch
alexec Oct 9, 2020
2293d20
pod-watch
alexec Oct 9, 2020
e9a8e1c
redundant check
alexec Oct 9, 2020
1c1b591
pod-watch
alexec Oct 9, 2020
3a4f2c6
pod-watch
alexec Oct 9, 2020
143fdc3
pod-watch
alexec Oct 9, 2020
9c20176
pod-watch
alexec Oct 9, 2020
4463e3f
trigger
alexec Oct 9, 2020
e0bb766
pod-watch
alexec Oct 9, 2020
d271b49
pod-watch
alexec Oct 9, 2020
df4d87c
pod-watch
alexec Oct 9, 2020
0d98c8c
pod-watch
alexec Oct 9, 2020
84fdaee
pod-watch
alexec Oct 10, 2020
0076855
pod-watch
alexec Oct 10, 2020
7ad9e55
pod-watch
alexec Oct 10, 2020
8c8aaf0
pod-watch
alexec Oct 10, 2020
4e276c3
pod-watch
alexec Oct 10, 2020
7da8a8e
pod-watch
alexec Oct 10, 2020
c7057db
pod-watch
alexec Oct 10, 2020
c9ab875
pod-watch
alexec Oct 10, 2020
d91280d
Merge branch 'master' into pod-watch
alexec Oct 13, 2020
5e0b9ea
pod-watch
alexec Oct 13, 2020
f0f0cfa
ok
alexec Oct 13, 2020
18662af
Merge branch 'master' into pod-watch
alexec Oct 13, 2020
2d5dc03
pod-watch
alexec Oct 13, 2020
211a6f0
ok
alexec Oct 13, 2020
69795ec
Merge branch 'master' into pod-watch
alexec Oct 13, 2020
84bf5bc
pod-watch
alexec Oct 13, 2020
cf079c7
pod-watch
alexec Oct 13, 2020
6ba8e43
fix + tidy up
alexec Oct 14, 2020
095820a
fix test
alexec Oct 14, 2020
af25935
revert
alexec Oct 14, 2020
760f998
fix test
alexec Oct 14, 2020
fa2bb04
simplify test
alexec Oct 14, 2020
7601f16
revert
alexec Oct 14, 2020
2769145
v2
alexec Oct 14, 2020
84ce095
pod-watch
alexec Oct 14, 2020
69a8b26
pod-watch
alexec Oct 14, 2020
5b97a6e
pod-watch
alexec Oct 14, 2020
83b8ae5
pod-watch
alexec Oct 14, 2020
4c079a8
pod-watch
alexec Oct 14, 2020
be22057
Merge branch 'master' into pod-watch
alexec Oct 14, 2020
f48a450
pod-watch
alexec Oct 14, 2020
4cfabef
pod-watch
alexec Oct 14, 2020
b326d7e
Merge branch 'master' into pod-watch
alexec Oct 14, 2020
f15846f
Merge branch 'master' into pod-watch
alexec Oct 14, 2020
50a8dc7
Merge branch 'master' into pod-watch
alexec Oct 14, 2020
87eacb2
pod-watch
alexec Oct 14, 2020
d778686
Update smoke_test.go
alexec Oct 14, 2020
340577c
Merge branch 'master' into pod-watch
alexec Oct 19, 2020
0782c60
Merge branch 'master' into pod-watch
alexec Oct 22, 2020
d1eaffe
Merge branch 'pod-watch' of github.com:alexec/argo into pod-watch
alexec Oct 22, 2020
d7805c4
Merge branch 'master' into pod-watch
alexec Oct 22, 2020
62d777f
pod-watch
alexec Oct 22, 2020
2aac204
Revert "pod-watch"
alexec Oct 22, 2020
3297684
pod-watch
alexec Oct 22, 2020
b5b6eac
pod-watch
alexec Oct 22, 2020
1e2e3de
pod-watch
alexec Oct 22, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 45 additions & 13 deletions .github/workflows/ci-build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,50 @@ on:

jobs:
tests:
name: Tests
name: Unit Tests
runs-on: ubuntu-latest
timeout-minutes: 10
steps:
- name: Checkout code
uses: actions/checkout@v2
- name: Restore go build cache
uses: actions/cache@v1
with:
path: ~/.cache/go-build
key: ${{ runner.os }}-go-build-v1-${{ hashFiles('**/go.mod') }}
- name: Setup Golang
uses: actions/setup-go@v1
with:
go-version: "1.13.12"
- name: Add bins to PATH
run: |
echo /home/runner/go/bin >> $GITHUB_PATH
echo /usr/local/bin >> $GITHUB_PATH
- name: Run tests
env:
GOPATH: /home/runner/go
run: make test

e2e-tests:
name: E2E Tests
runs-on: ubuntu-latest
timeout-minutes: 20
strategy:
fail-fast: false
matrix:
test: ["test", "smoke", "test-e2e", "test-e2e-cron"]
# kubelet is not included because it'd take ages to get it working methinks
test: [ "smoke", "test-e2e", "test-e2e-cron" ]
containerRuntimeExecutor: [ "docker", "k8sapi", "pns" ]
# ok, so we're only running `smoke` for all CREs,
exclude:
- test: test-e2e
containerRuntimeExecutor: k8sapi
- test: test-e2e
containerRuntimeExecutor: pns
- test: test-e2e-cron
containerRuntimeExecutor: k8sapi
- test: test-e2e-cron
containerRuntimeExecutor: pns
steps:
- name: Checkout code
uses: actions/checkout@v2
Expand All @@ -32,18 +69,15 @@ jobs:
go-version: "1.13.12"
- name: Add bins to PATH
run: |
echo "::add-path::/home/runner/go/bin"
echo "::add-path::/usr/local/bin"
echo /home/runner/go/bin >> $GITHUB_PATH
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security fix for our actions

echo /usr/local/bin >> $GITHUB_PATH
- name: Install and start K3S v1.18.8+k3s1
if: ${{ matrix.test != 'test' }}
run: curl -sfL https://get.k3s.io | INSTALL_K3S_VERSION=v1.18.8+k3s1 INSTALL_K3S_CHANNEL=stable INSTALL_K3S_EXEC=--docker K3S_KUBECONFIG_MODE=644 sh - &
- name: Pre-pull images
if: ${{ matrix.test != 'test' }}
env:
GOPATH: /home/runner/go
run: make pull-build-images test-images &
- name: Create Kubeconfig
if: ${{ matrix.test != 'test' }}
run: |
mkdir -p ~/.kube
until stat /etc/rancher/k3s/k3s.yaml ; do sleep 10s ; done
Expand All @@ -52,7 +86,6 @@ jobs:
echo " user:" >> ~/.kube/config
echo " token: xxxxxx" >> ~/.kube/config
- name: Start Argo
if: ${{ matrix.test != 'test' }}
env:
GOPATH: /home/runner/go
PROFILE: mysql
Expand All @@ -63,9 +96,8 @@ jobs:
echo '127.0.0.1 mysql' | sudo tee -a /etc/hosts
mkdir -p /tmp/log/argo-e2e
git fetch --tags
KUBECONFIG=~/.kube/config make start PROFILE=$PROFILE E2E_EXECUTOR=docker DEV_IMAGE=true STATIC_FILES=false 2>&1 > /tmp/log/argo-e2e/argo.log &
KUBECONFIG=~/.kube/config make start PROFILE=$PROFILE E2E_EXECUTOR=${{matrix.containerRuntimeExecutor}} DEV_IMAGE=true STATIC_FILES=false 2>&1 > /tmp/log/argo-e2e/argo.log &
- name: Wait for Argo Server to be ready
if: ${{ matrix.test != 'test' }}
env:
GOPATH: /home/runner/go
run: make wait
Expand All @@ -74,7 +106,7 @@ jobs:
GOPATH: /home/runner/go
run: make ${{ matrix.test }}
- name: Upload logs
if: ${{ always() && matrix.test != 'test' }}
if: ${{ always() }}
uses: actions/upload-artifact@v1
with:
name: ${{ matrix.test }}-${{ github.run_id }}-argo.log
Expand All @@ -98,8 +130,8 @@ jobs:
go-version: "1.13.12"
- name: Add bins to PATH
run: |
echo "::add-path::/home/runner/go/bin"
echo "::add-path::/usr/local/bin"
echo /home/runner/go/bin >> $GITHUB_PATH
echo /usr/local/bin >> $GITHUB_PATH
- name: Install protoc
run: |
set -eux -o pipefail
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ mysql-cli:

.PHONY: test-e2e
test-e2e:
go test -timeout 15m -count 1 --tags e2e -p 1 --short ./test/e2e
go test -timeout 20m -count 1 --tags e2e -p 1 --short ./test/e2e

.PHONY: test-e2e-cron
test-e2e-cron:
Expand Down
2 changes: 1 addition & 1 deletion docs/empty-dir.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Empty Dir

While by default, the Docker and PNS [workflow executors](workflow-executors.md) can get output artifacts from the base layer (e.g. `/tmp`), neither the Kubelet or K8SAPI exectuors can. Nor are you likely to be able to get output artifacts from the base layer if you run your workflo pods a [security context](workflow-pod-security-context.md).
While by default, the Docker and PNS [workflow executors](workflow-executors.md) can get output artifacts from the base layer (e.g. `/tmp`), neither the Kubelet or the K8SAPI executors can. It is unlikely you can get output artifacts from the base layer if you run your workflow pods with a [security context](workflow-pod-security-context.md).

You can work-around this constraint by mounting volumes onto your pod. The easiest way to do this is to use as `emptytDir` volume.

Expand Down
8 changes: 6 additions & 2 deletions test/e2e/fixtures/e2e_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const defaultTimeout = 30 * time.Second

type E2ESuite struct {
suite.Suite
Config config.Config
Config *config.Config
Persistence *Persistence
RestConfig *rest.Config
wfClient v1alpha1.WorkflowInterface
Expand All @@ -53,11 +53,15 @@ func (s *E2ESuite) SetupSuite() {
s.CheckError(err)
s.KubeClient, err = kubernetes.NewForConfig(s.RestConfig)
s.CheckError(err)
configController := config.NewController(Namespace, "workflow-controller-configmap", s.KubeClient, config.EmptyConfigFunc)
c, err := configController.Get()
s.CheckError(err)
s.Config = c.(*config.Config)
s.wfClient = versioned.NewForConfigOrDie(s.RestConfig).ArgoprojV1alpha1().Workflows(Namespace)
s.wfebClient = versioned.NewForConfigOrDie(s.RestConfig).ArgoprojV1alpha1().WorkflowEventBindings(Namespace)
s.wfTemplateClient = versioned.NewForConfigOrDie(s.RestConfig).ArgoprojV1alpha1().WorkflowTemplates(Namespace)
s.cronClient = versioned.NewForConfigOrDie(s.RestConfig).ArgoprojV1alpha1().CronWorkflows(Namespace)
s.Persistence = newPersistence(s.KubeClient)
s.Persistence = newPersistence(s.KubeClient, s.Config)
s.hydrator = hydrator.New(s.Persistence.offloadNodeStatusRepo)
s.cwfTemplateClient = versioned.NewForConfigOrDie(s.RestConfig).ArgoprojV1alpha1().ClusterWorkflowTemplates()
}
Expand Down
8 changes: 1 addition & 7 deletions test/e2e/fixtures/persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,7 @@ type Persistence struct {
workflowArchive sqldb.WorkflowArchive
}

func newPersistence(kubeClient kubernetes.Interface) *Persistence {
configController := config.NewController(Namespace, "workflow-controller-configmap", kubeClient, config.EmptyConfigFunc)
v, err := configController.Get()
if err != nil {
panic(err)
}
wcConfig := v.(*config.Config)
func newPersistence(kubeClient kubernetes.Interface, wcConfig *config.Config) *Persistence {
persistence := wcConfig.Persistence
if persistence != nil {
if persistence.PostgreSQL != nil {
Expand Down
10 changes: 7 additions & 3 deletions test/e2e/images/argosay/v2/main/argosay.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,13 @@ func echo(args []string) error {
return nil
case 2:
file := args[1]
err := os.MkdirAll(filepath.Dir(file), 0777)
if err != nil {
return err
dir := filepath.Dir(file)
_, err := os.Stat(dir)
if os.IsNotExist(err) {
err := os.MkdirAll(dir, 0777)
if err != nil {
return err
}
}
err = ioutil.WriteFile(file, []byte(args[0]), 0666)
if err != nil {
Expand Down
15 changes: 15 additions & 0 deletions test/e2e/smoke/runasnonroot-workflow.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: runasnonroot-
labels:
argo-e2e: true
spec:
entrypoint: main
securityContext:
runAsNonRoot: true
runAsUser: 8737
templates:
- name: main
container:
image: argoproj/argosay:v2
19 changes: 19 additions & 0 deletions test/e2e/smoke_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/test/e2e/fixtures"
"github.com/argoproj/argo/workflow/common"
)

type SmokeSuite struct {
Expand All @@ -32,7 +33,25 @@ func (s *SmokeSuite) TestBasicWorkflow() {
})
}

func (s *SmokeSuite) TestRunAsNonRootWorkflow() {
if s.Config.ContainerRuntimeExecutor == common.ContainerRuntimeExecutorDocker {
s.T().Skip("docker not supported")
}
s.Given().
Workflow("@smoke/runasnonroot-workflow.yaml").
When().
SubmitWorkflow().
WaitForWorkflow().
Then().
ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.NodeSucceeded, status.Phase)
})
}

func (s *SmokeSuite) TestArtifactPassing() {
if s.Config.ContainerRuntimeExecutor != common.ContainerRuntimeExecutorDocker {
s.T().Skip("non-docker not supported")
}
s.Given().
Workflow("@smoke/artifact-passing.yaml").
When().
Expand Down
47 changes: 47 additions & 0 deletions workflow/executor/common/wait/wait.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package wait

import (
"fmt"

log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"

"github.com/argoproj/argo/workflow/executor/common"
)

func UntilTerminated(kubernetesInterface kubernetes.Interface, namespace, podName, containerID string) error {
log.Infof("Waiting for container %s to be terminated", containerID)
podInterface := kubernetesInterface.CoreV1().Pods(namespace)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this idea. It is safer than earlier

listOptions := metav1.ListOptions{FieldSelector: "metadata.name=" + podName}
for {
done, err := untilTerminatedAux(podInterface, containerID, listOptions)
if done {
return err
}
}
}

func untilTerminatedAux(podInterface v1.PodInterface, containerID string, listOptions metav1.ListOptions) (bool, error) {
w, err := podInterface.Watch(listOptions)
if err != nil {
return true, fmt.Errorf("could not watch pod: %w", err)
}
defer w.Stop()
for event := range w.ResultChan() {
pod, ok := event.Object.(*corev1.Pod)
if !ok {
return false, apierrors.FromObject(event.Object)
}
for _, s := range pod.Status.ContainerStatuses {
if common.GetContainerID(&s) == containerID && s.State.Terminated != nil {
return true, nil
}
}
listOptions.ResourceVersion = pod.ResourceVersion
}
return true, nil
}
5 changes: 0 additions & 5 deletions workflow/executor/k8sapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"io"
"syscall"
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -79,10 +78,6 @@ func (c *k8sAPIClient) GetContainerStatus(containerID string) (*corev1.Pod, *cor
return nil, nil, errors.New(errors.CodeNotFound, fmt.Sprintf("containerID %q is not found in the pod %s", containerID, c.podName))
}

func (c *k8sAPIClient) waitForTermination(containerID string, timeout time.Duration) error {
return execcommon.WaitForTermination(c, containerID, timeout)
}

func (c *k8sAPIClient) KillContainer(pod *corev1.Pod, container *corev1.ContainerStatus, sig syscall.Signal) error {
command := []string{"/bin/sh", "-c", fmt.Sprintf("kill -%d 1", sig)}
exec, err := common.ExecPodContainer(c.config, c.namespace, c.podName, container.Name, false, true, command...)
Expand Down
4 changes: 2 additions & 2 deletions workflow/executor/k8sapi/k8sapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
restclient "k8s.io/client-go/rest"

"github.com/argoproj/argo/errors"
"github.com/argoproj/argo/workflow/executor/common/wait"
)

type K8sAPIExecutor struct {
Expand Down Expand Up @@ -60,8 +61,7 @@ func (k *K8sAPIExecutor) WaitInit() error {

// Wait for the container to complete
func (k *K8sAPIExecutor) Wait(containerID string) error {
log.Infof("Waiting for container %s to complete", containerID)
return k.client.waitForTermination(containerID, 0)
alexec marked this conversation as resolved.
Show resolved Hide resolved
return wait.UntilTerminated(k.client.clientset, k.client.namespace, k.client.podName, containerID)
}

// Kill kills a list of containerIDs first with a SIGTERM then with a SIGKILL after a grace period
Expand Down
9 changes: 4 additions & 5 deletions workflow/executor/pns/pns.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/argoproj/argo/util/archive"
"github.com/argoproj/argo/workflow/common"
execcommon "github.com/argoproj/argo/workflow/executor/common"
"github.com/argoproj/argo/workflow/executor/common/wait"
os_specific "github.com/argoproj/argo/workflow/executor/os-specific"
)

Expand All @@ -40,8 +41,6 @@ type PNSExecutor struct {

// thisPID is the pid of this process
thisPID int
// mainPID holds the main container's pid
mainPID int
// mainFS holds a file descriptor to the main filesystem, allowing the executor to access the
// filesystem after the main process exited
mainFS *os.File
Expand Down Expand Up @@ -163,16 +162,16 @@ func (p *PNSExecutor) WaitInit() error {
func (p *PNSExecutor) Wait(containerID string) error {
mainPID, err := p.getContainerPID(containerID)
if err != nil {
log.Warnf("Failed to get main PID: %v", err)
if !p.hasOutputs {
log.Warnf("Ignoring wait failure: %v. Process assumed to have completed", err)
return nil
}
return err
return wait.UntilTerminated(p.clientset, p.namespace, p.podName, containerID)
}
log.Infof("Main pid identified as %d", mainPID)
p.mainPID = mainPID
for pid, f := range p.pidFileHandles {
if pid == p.mainPID {
if pid == mainPID {
log.Info("Successfully secured file handle on main container root filesystem")
p.mainFS = &f.file
} else {
Expand Down
4 changes: 2 additions & 2 deletions workflow/validate/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -1022,7 +1022,7 @@ func (ctx *templateValidationCtx) validateBaseImageOutputs(tmpl *wfv1.Template)
// docker executor supports all modes of artifact outputs
case common.ContainerRuntimeExecutorPNS:
// pns supports copying from the base image, but only if there is no volume mount underneath it
errMsg := "pns executor does not support outputs from base image layer with volume mounts. must use emptyDir"
errMsg := "pns executor does not support outputs from base image layer with volume mounts. Use an emptyDir: https://argoproj.github.io/argo/empty-dir/"
for _, out := range tmpl.Outputs.Artifacts {
if common.FindOverlappingVolume(tmpl, out.Path) == nil {
// output is in the base image layer. need to verify there are no volume mounts under it
Expand All @@ -1045,7 +1045,7 @@ func (ctx *templateValidationCtx) validateBaseImageOutputs(tmpl *wfv1.Template)
}
case common.ContainerRuntimeExecutorK8sAPI, common.ContainerRuntimeExecutorKubelet:
// for kubelet/k8s fail validation if we detect artifact is copied from base image layer
errMsg := fmt.Sprintf("%s executor does not support outputs from base image layer. must use emptyDir", ctx.ContainerRuntimeExecutor)
errMsg := fmt.Sprintf("%s executor does not support outputs from base image layer. Use an emptyDir: https://argoproj.github.io/argo/empty-dir/", ctx.ContainerRuntimeExecutor)
for _, out := range tmpl.Outputs.Artifacts {
if common.FindOverlappingVolume(tmpl, out.Path) == nil {
return errors.Errorf(errors.CodeBadRequest, "templates.%s.outputs.artifacts.%s: %s", tmpl.Name, out.Name, errMsg)
Expand Down