diff --git a/pkg/skaffold/build/cluster/kaniko.go b/pkg/skaffold/build/cluster/kaniko.go index 09fdb7cc7c8..e0ebd59b961 100644 --- a/pkg/skaffold/build/cluster/kaniko.go +++ b/pkg/skaffold/build/cluster/kaniko.go @@ -75,7 +75,7 @@ func (b *Builder) runKanikoBuild(ctx context.Context, out io.Writer, artifact *l return "", errors.Wrap(err, "modifying kaniko pod") } - waitForLogs := streamLogs(out, pod.Name, pods) + waitForLogs := streamLogs(ctx, out, pod.Name, pods) err = kubernetes.WaitForPodSucceeded(ctx, pods, pod.Name, b.timeout) waitForLogs() diff --git a/pkg/skaffold/build/cluster/logs.go b/pkg/skaffold/build/cluster/logs.go index acec4c4367e..43e70489821 100644 --- a/pkg/skaffold/build/cluster/logs.go +++ b/pkg/skaffold/build/cluster/logs.go @@ -17,6 +17,9 @@ limitations under the License. package cluster import ( + "bufio" + "context" + "fmt" "io" "sync" "sync/atomic" @@ -40,7 +43,7 @@ func logLevel() logrus.Level { return level } -func streamLogs(out io.Writer, name string, pods corev1.PodInterface) func() { +func streamLogs(ctx context.Context, out io.Writer, name string, pods corev1.PodInterface) func() { var wg sync.WaitGroup wg.Add(1) @@ -60,9 +63,20 @@ func streamLogs(out io.Writer, name string, pods corev1.PodInterface) func() { continue } - w, _ := io.Copy(out, r) - atomic.AddInt64(&written, w) - return + scanner := bufio.NewScanner(r) + for { + select { + case <-ctx.Done(): + return // The build was cancelled + default: + if !scanner.Scan() { + return // No more logs + } + + fmt.Fprintln(out, scanner.Text()) + atomic.AddInt64(&written, 1) + } + } } }()