diff --git a/integration/exec_k8s_actions_test.go b/integration/exec_k8s_actions_test.go index e8b88830ad9..44cac95a512 100644 --- a/integration/exec_k8s_actions_test.go +++ b/integration/exec_k8s_actions_test.go @@ -29,23 +29,26 @@ import ( func TestExec_K8SActions(t *testing.T) { tests := []struct { - description string - action string - shouldErr bool - envFile string - expectedMsgs []string + description string + action string + shouldErr bool + envFile string + expectedMsgs []string + notExpectedLogs []string }{ { - description: "fail due to action timeout", - action: "action-fail-timeout", - shouldErr: true, - expectedMsgs: []string{"context deadline exceeded"}, + description: "fail due to action timeout", + action: "action-fail-timeout", + shouldErr: true, + expectedMsgs: []string{"context deadline exceeded"}, + notExpectedLogs: []string{"[task1] bye-from-task1"}, }, { - description: "fail with fail fast", - action: "action-fail-fast-logs", - shouldErr: true, - expectedMsgs: []string{`error in task4l job execution, job failed`}, + description: "fail with fail fast", + action: "action-fail-fast", + shouldErr: true, + expectedMsgs: []string{`error in task4 job execution, job failed`}, + notExpectedLogs: []string{"[task3] bye-from-task3"}, }, { description: "fail with fail safe", @@ -79,9 +82,14 @@ func TestExec_K8SActions(t *testing.T) { out, err := skaffold.Exec(args...).InDir("testdata/custom-actions-k8s").RunWithCombinedOutput(t.T) t.CheckError(test.shouldErr, err) + logs := string(out) for _, expectedMsg := range test.expectedMsgs { - t.CheckContains(expectedMsg, string(out)) + t.CheckContains(expectedMsg, logs) + } + + for _, nel := range test.notExpectedLogs { + testutil.CheckNotContains(t.T, nel, logs) } }) } diff --git a/integration/testdata/custom-actions-k8s/skaffold.yaml b/integration/testdata/custom-actions-k8s/skaffold.yaml index fee9eedf12d..edea2796ab5 100644 --- a/integration/testdata/custom-actions-k8s/skaffold.yaml +++ b/integration/testdata/custom-actions-k8s/skaffold.yaml @@ -39,25 +39,6 @@ customActions: - name: FOO value: from-task4 - - name: action-fail-fast-logs - executionMode: - kubernetesCluster: {} - containers: - - name: task3l - image: alpine:3.15.4 - command: ["/bin/sh"] - args: ["-c", "echo hello-$FOO && sleep 1 && echo bye-$FOO"] - env: - - name: FOO - value: from-task3l - - name: task4l - image: alpine:3.15.4 - command: ["/bin/sh"] - args: ["-c", "echo hello-$FOO && exit 1"] - env: - - name: FOO - value: from-task4l - - name: action-fail-safe executionMode: kubernetesCluster: {} diff --git a/pkg/skaffold/actions/k8sjob/task.go b/pkg/skaffold/actions/k8sjob/task.go index 751e79c3c6c..281e08b3a84 100644 --- a/pkg/skaffold/actions/k8sjob/task.go +++ b/pkg/skaffold/actions/k8sjob/task.go @@ -117,6 +117,7 @@ func (t Task) Exec(ctx context.Context, out io.Writer) error { } if err = t.watchStatus(ctx, t.jobManifest, jm); err != nil { + t.execEnv.logger.CancelJobLogger(t.jobManifest.Name) t.deleteJob(context.TODO(), t.jobManifest.Name, jm) } @@ -184,10 +185,6 @@ func (t Task) deleteJob(ctx context.Context, jobName string, jobsManager typesba return errors.Wrap(err, fmt.Sprintf("deleting %v job", jobName)) } - if err = t.deleteJobPod(ctx, jobName); err != nil { - return err - } - err = t.withRetryablePoll(ctx, func(ctx context.Context) error { return jobsManager.Delete(ctx, jobName, v1.DeleteOptions{ GracePeriodSeconds: util.Ptr[int64](0), @@ -195,11 +192,11 @@ func (t Task) deleteJob(ctx context.Context, jobName string, jobsManager typesba }) }) - if apierrs.IsNotFound(err) { - err = nil + if err != nil && !apierrs.IsNotFound(err) { + return err } - return err + return t.deleteJobPod(ctx, jobName) } func (t Task) deleteJobPod(ctx context.Context, jobName string) error { diff --git a/pkg/skaffold/k8sjob/logger/log.go b/pkg/skaffold/k8sjob/logger/log.go index d721647e5c0..0f38e2e356d 100644 --- a/pkg/skaffold/k8sjob/logger/log.go +++ b/pkg/skaffold/k8sjob/logger/log.go @@ -49,6 +49,8 @@ type Logger struct { childThreadEmitLogs AtomicBool muted int32 kubeContext string + // Map to store cancel functions per each job. + jobLoggerCancelers sync.Map } type AtomicBool struct{ flag int32 } @@ -127,7 +129,9 @@ func (l *Logger) Start(ctx context.Context, out io.Writer) error { return case info := <-l.tracker.Notifier(): id, namespace := info[0], info[1] - go l.streamLogsFromKubernetesJob(ctx, id, namespace, false) + jobLogCancelCtx, jobLogCancel := context.WithCancel(ctx) + l.jobLoggerCancelers.Store(id, jobLogCancel) + go l.streamLogsFromKubernetesJob(jobLogCancelCtx, id, namespace, false) } } }() @@ -156,7 +160,7 @@ func (l *Logger) streamLogsFromKubernetesJob(ctx context.Context, id, namespace } } var podName string - w, err := clientset.CoreV1().Pods(namespace).Watch(context.TODO(), + w, err := clientset.CoreV1().Pods(namespace).Watch(ctx, metav1.ListOptions{ LabelSelector: labels.Set(map[string]string{"job-name": id, "skaffold.dev/run-id": l.labeller.GetRunID()}).String(), }) @@ -191,7 +195,7 @@ func (l *Logger) streamLogsFromKubernetesJob(ctx context.Context, id, namespace // Stream the logs req := clientset.CoreV1().Pods(namespace).GetLogs(podName, podLogOptions) - podLogs, err := req.Stream(context.TODO()) + podLogs, err := req.Stream(ctx) if err != nil { return false, nil } @@ -219,6 +223,7 @@ func (l *Logger) Stop() { return } l.childThreadEmitLogs.Set(false) + l.wg.Wait() l.hadLogsOutput.Range(func(key, value interface{}) bool { if !value.(bool) { @@ -262,3 +267,9 @@ func (l *Logger) IsMuted() bool { func (l *Logger) SetSince(time.Time) { // we always create a new Job on Verify, so this is a noop. } + +func (l *Logger) CancelJobLogger(jobID string) { + if cancelJobLogger, found := l.jobLoggerCancelers.Load(jobID); found { + cancelJobLogger.(context.CancelFunc)() + } +}