Skip to content

Commit

Permalink
fix: logic to interrupt a k8sjob logs as soon as it fails (#8847) (#8871
Browse files Browse the repository at this point in the history
)

* fix: logic to interrupt a k8sjob logs as soon as it fails and change in delete order, now the job is deleted before its pods

* test: integration tests to check when a log should not appear in the output

(cherry picked from commit c7c72a1)
  • Loading branch information
renzodavid9 authored Jun 9, 2023
1 parent bb0ed01 commit 1c990f2
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 43 deletions.
36 changes: 22 additions & 14 deletions integration/exec_k8s_actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,26 @@ import (

func TestExec_K8SActions(t *testing.T) {
tests := []struct {
description string
action string
shouldErr bool
envFile string
expectedMsgs []string
description string
action string
shouldErr bool
envFile string
expectedMsgs []string
notExpectedLogs []string
}{
{
description: "fail due to action timeout",
action: "action-fail-timeout",
shouldErr: true,
expectedMsgs: []string{"context deadline exceeded"},
description: "fail due to action timeout",
action: "action-fail-timeout",
shouldErr: true,
expectedMsgs: []string{"context deadline exceeded"},
notExpectedLogs: []string{"[task1] bye-from-task1"},
},
{
description: "fail with fail fast",
action: "action-fail-fast-logs",
shouldErr: true,
expectedMsgs: []string{`error in task4l job execution, job failed`},
description: "fail with fail fast",
action: "action-fail-fast",
shouldErr: true,
expectedMsgs: []string{`error in task4 job execution, job failed`},
notExpectedLogs: []string{"[task3] bye-from-task3"},
},
{
description: "fail with fail safe",
Expand Down Expand Up @@ -79,9 +82,14 @@ func TestExec_K8SActions(t *testing.T) {

out, err := skaffold.Exec(args...).InDir("testdata/custom-actions-k8s").RunWithCombinedOutput(t.T)
t.CheckError(test.shouldErr, err)
logs := string(out)

for _, expectedMsg := range test.expectedMsgs {
t.CheckContains(expectedMsg, string(out))
t.CheckContains(expectedMsg, logs)
}

for _, nel := range test.notExpectedLogs {
testutil.CheckNotContains(t.T, nel, logs)
}
})
}
Expand Down
19 changes: 0 additions & 19 deletions integration/testdata/custom-actions-k8s/skaffold.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,25 +39,6 @@ customActions:
- name: FOO
value: from-task4

- name: action-fail-fast-logs
executionMode:
kubernetesCluster: {}
containers:
- name: task3l
image: alpine:3.15.4
command: ["/bin/sh"]
args: ["-c", "echo hello-$FOO && sleep 1 && echo bye-$FOO"]
env:
- name: FOO
value: from-task3l
- name: task4l
image: alpine:3.15.4
command: ["/bin/sh"]
args: ["-c", "echo hello-$FOO && exit 1"]
env:
- name: FOO
value: from-task4l

- name: action-fail-safe
executionMode:
kubernetesCluster: {}
Expand Down
11 changes: 4 additions & 7 deletions pkg/skaffold/actions/k8sjob/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ func (t Task) Exec(ctx context.Context, out io.Writer) error {
}

if err = t.watchStatus(ctx, t.jobManifest, jm); err != nil {
t.execEnv.logger.CancelJobLogger(t.jobManifest.Name)
t.deleteJob(context.TODO(), t.jobManifest.Name, jm)
}

Expand Down Expand Up @@ -184,22 +185,18 @@ func (t Task) deleteJob(ctx context.Context, jobName string, jobsManager typesba
return errors.Wrap(err, fmt.Sprintf("deleting %v job", jobName))
}

if err = t.deleteJobPod(ctx, jobName); err != nil {
return err
}

err = t.withRetryablePoll(ctx, func(ctx context.Context) error {
return jobsManager.Delete(ctx, jobName, v1.DeleteOptions{
GracePeriodSeconds: util.Ptr[int64](0),
PropagationPolicy: util.Ptr(v1.DeletePropagationForeground),
})
})

if apierrs.IsNotFound(err) {
err = nil
if err != nil && !apierrs.IsNotFound(err) {
return err
}

return err
return t.deleteJobPod(ctx, jobName)
}

func (t Task) deleteJobPod(ctx context.Context, jobName string) error {
Expand Down
17 changes: 14 additions & 3 deletions pkg/skaffold/k8sjob/logger/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type Logger struct {
childThreadEmitLogs AtomicBool
muted int32
kubeContext string
// Map to store cancel functions per each job.
jobLoggerCancelers sync.Map
}

type AtomicBool struct{ flag int32 }
Expand Down Expand Up @@ -127,7 +129,9 @@ func (l *Logger) Start(ctx context.Context, out io.Writer) error {
return
case info := <-l.tracker.Notifier():
id, namespace := info[0], info[1]
go l.streamLogsFromKubernetesJob(ctx, id, namespace, false)
jobLogCancelCtx, jobLogCancel := context.WithCancel(ctx)
l.jobLoggerCancelers.Store(id, jobLogCancel)
go l.streamLogsFromKubernetesJob(jobLogCancelCtx, id, namespace, false)
}
}
}()
Expand Down Expand Up @@ -156,7 +160,7 @@ func (l *Logger) streamLogsFromKubernetesJob(ctx context.Context, id, namespace
}
}
var podName string
w, err := clientset.CoreV1().Pods(namespace).Watch(context.TODO(),
w, err := clientset.CoreV1().Pods(namespace).Watch(ctx,
metav1.ListOptions{
LabelSelector: labels.Set(map[string]string{"job-name": id, "skaffold.dev/run-id": l.labeller.GetRunID()}).String(),
})
Expand Down Expand Up @@ -191,7 +195,7 @@ func (l *Logger) streamLogsFromKubernetesJob(ctx context.Context, id, namespace

// Stream the logs
req := clientset.CoreV1().Pods(namespace).GetLogs(podName, podLogOptions)
podLogs, err := req.Stream(context.TODO())
podLogs, err := req.Stream(ctx)
if err != nil {
return false, nil
}
Expand Down Expand Up @@ -219,6 +223,7 @@ func (l *Logger) Stop() {
return
}
l.childThreadEmitLogs.Set(false)
l.wg.Wait()

l.hadLogsOutput.Range(func(key, value interface{}) bool {
if !value.(bool) {
Expand Down Expand Up @@ -262,3 +267,9 @@ func (l *Logger) IsMuted() bool {
func (l *Logger) SetSince(time.Time) {
// we always create a new Job on Verify, so this is a noop.
}

func (l *Logger) CancelJobLogger(jobID string) {
if cancelJobLogger, found := l.jobLoggerCancelers.Load(jobID); found {
cancelJobLogger.(context.CancelFunc)()
}
}

0 comments on commit 1c990f2

Please sign in to comment.