From dbebb59f17259b4edcc9e411b7b7f1396eaacccc Mon Sep 17 00:00:00 2001 From: David Gageot Date: Wed, 19 Jun 2019 17:04:15 +0200 Subject: [PATCH 1/8] =?UTF-8?q?Don=E2=80=99t=20leak=20go=20routines?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Make sure streamRequest() exits. Signed-off-by: David Gageot --- pkg/skaffold/kubernetes/log.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/skaffold/kubernetes/log.go b/pkg/skaffold/kubernetes/log.go index e59fe12851b..3f1f17e6be8 100644 --- a/pkg/skaffold/kubernetes/log.go +++ b/pkg/skaffold/kubernetes/log.go @@ -156,7 +156,10 @@ func (a *LogAggregator) streamContainerLogs(ctx context.Context, pod *v1.Pod, co tr, tw := io.Pipe() cmd := exec.CommandContext(ctx, "kubectl", "logs", sinceSeconds, "-f", pod.Name, "-c", container.Name, "--namespace", pod.Namespace) cmd.Stdout = tw - go util.RunCmd(cmd) + go func() { + util.RunCmd(cmd) + tw.Close() + }() color := a.colorPicker.Pick(pod) prefix := prefix(pod, container) From f6679c9db904d0cc1bd0555c369ab4b79aa109e7 Mon Sep 17 00:00:00 2001 From: David Gageot Date: Wed, 19 Jun 2019 17:05:17 +0200 Subject: [PATCH 2/8] Never reprint the log for a given container Signed-off-by: David Gageot --- pkg/skaffold/kubernetes/log.go | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/pkg/skaffold/kubernetes/log.go b/pkg/skaffold/kubernetes/log.go index 3f1f17e6be8..90690e8e70e 100644 --- a/pkg/skaffold/kubernetes/log.go +++ b/pkg/skaffold/kubernetes/log.go @@ -167,7 +167,6 @@ func (a *LogAggregator) streamContainerLogs(ctx context.Context, pod *v1.Pod, co if err := a.streamRequest(ctx, color, prefix, tr); err != nil { logrus.Errorf("streaming request %s", err) } - a.trackedContainers.remove(container.ContainerID) }() } @@ -243,12 +242,6 @@ func (t *trackedContainers) add(id string) bool { return alreadyTracked } -func (t *trackedContainers) remove(id string) { - t.Lock() - delete(t.ids, id) - t.Unlock() -} - // PodSelector is used to choose which pods to log. type PodSelector interface { Select(pod *v1.Pod) bool @@ -274,13 +267,6 @@ func (l *ImageList) Add(image string) { l.Unlock() } -// Remove removes an image from the list. -func (l *ImageList) Remove(image string) { - l.Lock() - delete(l.names, image) - l.Unlock() -} - // Select returns true if one of the pod's images is in the list. func (l *ImageList) Select(pod *v1.Pod) bool { l.RLock() From b30552ec870e931f9362715a04f14f031809e65e Mon Sep 17 00:00:00 2001 From: David Gageot Date: Thu, 20 Jun 2019 16:04:15 +0200 Subject: [PATCH 3/8] Simplify code Signed-off-by: David Gageot --- pkg/skaffold/kubernetes/log.go | 73 +++++++++++++++------------------- 1 file changed, 33 insertions(+), 40 deletions(-) diff --git a/pkg/skaffold/kubernetes/log.go b/pkg/skaffold/kubernetes/log.go index 90690e8e70e..13ed841015a 100644 --- a/pkg/skaffold/kubernetes/log.go +++ b/pkg/skaffold/kubernetes/log.go @@ -26,13 +26,12 @@ import ( "sync/atomic" "time" + "github.com/GoogleContainerTools/skaffold/pkg/skaffold/color" + "github.com/GoogleContainerTools/skaffold/pkg/skaffold/util" "github.com/pkg/errors" "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/watch" - - "github.com/GoogleContainerTools/skaffold/pkg/skaffold/color" - "github.com/GoogleContainerTools/skaffold/pkg/skaffold/util" ) // Client is for tests @@ -104,21 +103,21 @@ func (a *LogAggregator) Start(ctx context.Context) error { continue } - for _, container := range append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...) { - if container.ContainerID == "" { - if container.State.Waiting != nil && container.State.Waiting.Message != "" { - color.Red.Fprintln(a.output, container.State.Waiting.Message) + for _, c := range append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...) { + if c.ContainerID == "" { + if c.State.Waiting != nil && c.State.Waiting.Message != "" { + color.Red.Fprintln(a.output, c.State.Waiting.Message) } continue } - if container.State.Terminated != nil { - color.Purple.Fprintln(a.output, container.State.Terminated.Message) + if c.State.Terminated != nil { + color.Purple.Fprintln(a.output, c.State.Terminated.Message) continue } - if !a.trackedContainers.add(container.ContainerID) { - go a.streamContainerLogs(cancelCtx, pod, container) + if !a.trackedContainers.add(c.ContainerID) { + go a.streamContainerLogs(cancelCtx, pod, c) } } } @@ -161,13 +160,18 @@ func (a *LogAggregator) streamContainerLogs(ctx context.Context, pod *v1.Pod, co tw.Close() }() - color := a.colorPicker.Pick(pod) + headerColor := a.colorPicker.Pick(pod) prefix := prefix(pod, container) - go func() { - if err := a.streamRequest(ctx, color, prefix, tr); err != nil { - logrus.Errorf("streaming request %s", err) - } - }() + if err := a.streamRequest(ctx, headerColor, prefix, tr); err != nil { + logrus.Errorf("streaming request %s", err) + } +} + +func (a *LogAggregator) printLogLine(headerColor color.Color, prefix, text string) { + if !a.IsMuted() { + headerColor.Fprintf(a.output, "%s ", prefix) + fmt.Fprint(a.output, text) + } } func prefix(pod *v1.Pod, container v1.ContainerStatus) string { @@ -177,38 +181,27 @@ func prefix(pod *v1.Pod, container v1.ContainerStatus) string { return fmt.Sprintf("[%s]", container.Name) } -func (a *LogAggregator) streamRequest(ctx context.Context, headerColor color.Color, header string, rc io.Reader) error { +func (a *LogAggregator) streamRequest(ctx context.Context, headerColor color.Color, prefix string, rc io.Reader) error { r := bufio.NewReader(rc) for { select { case <-ctx.Done(): - logrus.Infof("%s interrupted", header) + logrus.Infof("%s interrupted", prefix) return nil default: - } - - // Read up to newline - line, err := r.ReadBytes('\n') - if err == io.EOF { - break - } - if err != nil { - return errors.Wrap(err, "reading bytes from log stream") - } - - if a.IsMuted() { - continue - } + // Read up to newline + line, err := r.ReadString('\n') + if err == io.EOF { + logrus.Infof("%s exited", prefix) + return nil + } + if err != nil { + return errors.Wrap(err, "reading bytes from log stream") + } - if _, err := headerColor.Fprintf(a.output, "%s ", header); err != nil { - return errors.Wrap(err, "writing pod prefix header to out") - } - if _, err := fmt.Fprint(a.output, string(line)); err != nil { - return errors.Wrap(err, "writing pod log to out") + a.printLogLine(headerColor, prefix, line) } } - logrus.Infof("%s exited", header) - return nil } // Mute mutes the logs. From 7577a46997fe2ae4a7462370506640e519a3f442 Mon Sep 17 00:00:00 2001 From: David Gageot Date: Sat, 22 Jun 2019 10:16:23 +0200 Subject: [PATCH 4/8] Handle init containers first Signed-off-by: David Gageot --- pkg/skaffold/kubernetes/log.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/skaffold/kubernetes/log.go b/pkg/skaffold/kubernetes/log.go index 13ed841015a..bf7d9ec68cb 100644 --- a/pkg/skaffold/kubernetes/log.go +++ b/pkg/skaffold/kubernetes/log.go @@ -103,7 +103,7 @@ func (a *LogAggregator) Start(ctx context.Context) error { continue } - for _, c := range append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...) { + for _, c := range append(pod.Status.InitContainerStatuses, pod.Status.ContainerStatuses...) { if c.ContainerID == "" { if c.State.Waiting != nil && c.State.Waiting.Message != "" { color.Red.Fprintln(a.output, c.State.Waiting.Message) From b7f07723c66f2421414c5c0b58515f7b69fabcf0 Mon Sep 17 00:00:00 2001 From: David Gageot Date: Sat, 22 Jun 2019 10:17:18 +0200 Subject: [PATCH 5/8] Handle every type of events Signed-off-by: David Gageot --- pkg/skaffold/kubernetes/log.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pkg/skaffold/kubernetes/log.go b/pkg/skaffold/kubernetes/log.go index bf7d9ec68cb..dab6c76f3bb 100644 --- a/pkg/skaffold/kubernetes/log.go +++ b/pkg/skaffold/kubernetes/log.go @@ -90,10 +90,6 @@ func (a *LogAggregator) Start(ctx context.Context) error { return } - if evt.Type != watch.Added && evt.Type != watch.Modified { - continue - } - pod, ok := evt.Object.(*v1.Pod) if !ok { continue From a59f0507f4ae605d10fd5066fbb8ecd4a2e2ac5c Mon Sep 17 00:00:00 2001 From: David Gageot Date: Sat, 22 Jun 2019 10:32:32 +0200 Subject: [PATCH 6/8] Notify the user that the container logs ended Signed-off-by: David Gageot --- pkg/skaffold/kubernetes/log.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/pkg/skaffold/kubernetes/log.go b/pkg/skaffold/kubernetes/log.go index dab6c76f3bb..4cd04fc6658 100644 --- a/pkg/skaffold/kubernetes/log.go +++ b/pkg/skaffold/kubernetes/log.go @@ -107,11 +107,6 @@ func (a *LogAggregator) Start(ctx context.Context) error { continue } - if c.State.Terminated != nil { - color.Purple.Fprintln(a.output, c.State.Terminated.Message) - continue - } - if !a.trackedContainers.add(c.ContainerID) { go a.streamContainerLogs(cancelCtx, pod, c) } @@ -188,7 +183,7 @@ func (a *LogAggregator) streamRequest(ctx context.Context, headerColor color.Col // Read up to newline line, err := r.ReadString('\n') if err == io.EOF { - logrus.Infof("%s exited", prefix) + a.printLogLine(headerColor, prefix, "\n") return nil } if err != nil { From 580078b2efc4d15cfb0f45ad324b565ae09d76d6 Mon Sep 17 00:00:00 2001 From: David Gageot Date: Mon, 24 Jun 2019 18:18:53 +0200 Subject: [PATCH 7/8] Also match init containers when selecting pods Signed-off-by: David Gageot --- pkg/skaffold/kubernetes/log.go | 2 +- pkg/skaffold/kubernetes/log_test.go | 39 +++++++++++++++++++++++++++++ 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/pkg/skaffold/kubernetes/log.go b/pkg/skaffold/kubernetes/log.go index 4cd04fc6658..c2b9948a785 100644 --- a/pkg/skaffold/kubernetes/log.go +++ b/pkg/skaffold/kubernetes/log.go @@ -256,7 +256,7 @@ func (l *ImageList) Select(pod *v1.Pod) bool { l.RLock() defer l.RUnlock() - for _, container := range pod.Spec.Containers { + for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) { if l.names[container.Image] { return true } diff --git a/pkg/skaffold/kubernetes/log_test.go b/pkg/skaffold/kubernetes/log_test.go index 7d0ddcf2931..8ecae0e6c12 100644 --- a/pkg/skaffold/kubernetes/log_test.go +++ b/pkg/skaffold/kubernetes/log_test.go @@ -21,6 +21,7 @@ import ( "time" "github.com/GoogleContainerTools/skaffold/testutil" + v1 "k8s.io/api/core/v1" ) func TestSinceSeconds(t *testing.T) { @@ -49,3 +50,41 @@ func TestSinceSeconds(t *testing.T) { }) } } + +func TestSelect(t *testing.T) { + var tests = []struct { + description string + images []string + podSpec v1.PodSpec + expectedMatch bool + }{ + { + description: "match container", + podSpec: v1.PodSpec{Containers: []v1.Container{{Image: "image1"}}}, + expectedMatch: true, + }, + { + description: "match init container", + podSpec: v1.PodSpec{InitContainers: []v1.Container{{Image: "image2"}}}, + expectedMatch: true, + }, + { + description: "no match", + podSpec: v1.PodSpec{Containers: []v1.Container{{Image: "image3"}}}, + expectedMatch: false, + }, + } + for _, test := range tests { + testutil.Run(t, test.description, func(t *testutil.T) { + list := NewImageList() + list.Add("image1") + list.Add("image2") + + selected := list.Select(&v1.Pod{ + Spec: test.podSpec, + }) + + t.CheckDeepEqual(test.expectedMatch, selected) + }) + } +} From f18147508c127ae3de051fb64cbd09b730208297 Mon Sep 17 00:00:00 2001 From: David Gageot Date: Wed, 26 Jun 2019 13:10:55 +0200 Subject: [PATCH 8/8] =?UTF-8?q?Don=E2=80=99t=20print=20logs=20while=20depl?= =?UTF-8?q?oy=20is=20running.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit But once deploy is done, print the logs up the just before deploy was started. Signed-off-by: David Gageot --- pkg/skaffold/kubernetes/log.go | 9 ++++++--- pkg/skaffold/runner/build_deploy.go | 11 ++++++++--- pkg/skaffold/runner/dev.go | 16 ++++++++++------ 3 files changed, 24 insertions(+), 12 deletions(-) diff --git a/pkg/skaffold/kubernetes/log.go b/pkg/skaffold/kubernetes/log.go index c2b9948a785..f1d21ca1e96 100644 --- a/pkg/skaffold/kubernetes/log.go +++ b/pkg/skaffold/kubernetes/log.go @@ -46,7 +46,7 @@ type LogAggregator struct { colorPicker ColorPicker muted int32 - startTime time.Time + sinceTime time.Time cancel context.CancelFunc trackedContainers trackedContainers } @@ -64,12 +64,15 @@ func NewLogAggregator(out io.Writer, baseImageNames []string, podSelector PodSel } } +func (a *LogAggregator) SetSince(t time.Time) { + a.sinceTime = t +} + // Start starts a logger that listens to pods and tail their logs // if they are matched by the `podSelector`. func (a *LogAggregator) Start(ctx context.Context) error { cancelCtx, cancel := context.WithCancel(ctx) a.cancel = cancel - a.startTime = time.Now() aggregate := make(chan watch.Event) stopWatchers, err := AggregatePodWatcher(a.namespaces, aggregate) @@ -141,7 +144,7 @@ func (a *LogAggregator) streamContainerLogs(ctx context.Context, pod *v1.Pod, co // In theory, it's more precise to use --since-time='' but there can be a time // difference between the user's machine and the server. // So we use --since=Xs and round up to the nearest second to not lose any log. - sinceSeconds := fmt.Sprintf("--since=%ds", sinceSeconds(time.Since(a.startTime))) + sinceSeconds := fmt.Sprintf("--since=%ds", sinceSeconds(time.Since(a.sinceTime))) tr, tw := io.Pipe() cmd := exec.CommandContext(ctx, "kubectl", "logs", sinceSeconds, "-f", pod.Name, "-c", container.Name, "--namespace", pod.Namespace) diff --git a/pkg/skaffold/runner/build_deploy.go b/pkg/skaffold/runner/build_deploy.go index 5c718e77c91..430b21fb91f 100644 --- a/pkg/skaffold/runner/build_deploy.go +++ b/pkg/skaffold/runner/build_deploy.go @@ -19,6 +19,7 @@ package runner import ( "context" "io" + "time" "github.com/GoogleContainerTools/skaffold/pkg/skaffold/build" "github.com/GoogleContainerTools/skaffold/pkg/skaffold/schema/latest" @@ -79,14 +80,18 @@ func (r *SkaffoldRunner) DeployAndLog(ctx context.Context, out io.Writer, artifa logger := r.newLoggerForImages(out, imageNames) defer logger.Stop() - if err := logger.Start(ctx); err != nil { - return errors.Wrap(err, "starting logger") - } + // Logs should be retrieve up to just before the deploy + logger.SetSince(time.Now()) if err := r.Deploy(ctx, out, artifacts); err != nil { return err } + // Start printing the logs after deploy is finished + if err := logger.Start(ctx); err != nil { + return errors.Wrap(err, "starting logger") + } + <-ctx.Done() return nil diff --git a/pkg/skaffold/runner/dev.go b/pkg/skaffold/runner/dev.go index 730ddd02ef1..e07bfa0a106 100644 --- a/pkg/skaffold/runner/dev.go +++ b/pkg/skaffold/runner/dev.go @@ -19,6 +19,7 @@ package runner import ( "context" "io" + "time" "github.com/GoogleContainerTools/skaffold/pkg/skaffold/color" "github.com/GoogleContainerTools/skaffold/pkg/skaffold/kubernetes/portforward" @@ -133,18 +134,21 @@ func (r *SkaffoldRunner) Dev(ctx context.Context, out io.Writer, artifacts []*la return errors.Wrap(err, "exiting dev mode because first build failed") } - // Start logs - if r.runCtx.Opts.TailDev { - if err := logger.Start(ctx); err != nil { - return errors.Wrap(err, "starting logger") - } - } + // Logs should be retrieve up to just before the deploy + logger.SetSince(time.Now()) // First deploy if err := r.Deploy(ctx, out, r.builds); err != nil { return errors.Wrap(err, "exiting dev mode because first deploy failed") } + // Start printing the logs after deploy is finished + if r.runCtx.Opts.TailDev { + if err := logger.Start(ctx); err != nil { + return errors.Wrap(err, "starting logger") + } + } + // Forward ports if err := forwarderManager.Start(ctx); err != nil { return errors.Wrap(err, "starting forwarder manager")