diff --git a/pkg/skaffold/kubernetes/log.go b/pkg/skaffold/kubernetes/log.go index e59fe12851b..f1d21ca1e96 100644 --- a/pkg/skaffold/kubernetes/log.go +++ b/pkg/skaffold/kubernetes/log.go @@ -26,13 +26,12 @@ import ( "sync/atomic" "time" + "github.com/GoogleContainerTools/skaffold/pkg/skaffold/color" + "github.com/GoogleContainerTools/skaffold/pkg/skaffold/util" "github.com/pkg/errors" "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/watch" - - "github.com/GoogleContainerTools/skaffold/pkg/skaffold/color" - "github.com/GoogleContainerTools/skaffold/pkg/skaffold/util" ) // Client is for tests @@ -47,7 +46,7 @@ type LogAggregator struct { colorPicker ColorPicker muted int32 - startTime time.Time + sinceTime time.Time cancel context.CancelFunc trackedContainers trackedContainers } @@ -65,12 +64,15 @@ func NewLogAggregator(out io.Writer, baseImageNames []string, podSelector PodSel } } +func (a *LogAggregator) SetSince(t time.Time) { + a.sinceTime = t +} + // Start starts a logger that listens to pods and tail their logs // if they are matched by the `podSelector`. func (a *LogAggregator) Start(ctx context.Context) error { cancelCtx, cancel := context.WithCancel(ctx) a.cancel = cancel - a.startTime = time.Now() aggregate := make(chan watch.Event) stopWatchers, err := AggregatePodWatcher(a.namespaces, aggregate) @@ -91,10 +93,6 @@ func (a *LogAggregator) Start(ctx context.Context) error { return } - if evt.Type != watch.Added && evt.Type != watch.Modified { - continue - } - pod, ok := evt.Object.(*v1.Pod) if !ok { continue @@ -104,21 +102,16 @@ func (a *LogAggregator) Start(ctx context.Context) error { continue } - for _, container := range append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...) { - if container.ContainerID == "" { - if container.State.Waiting != nil && container.State.Waiting.Message != "" { - color.Red.Fprintln(a.output, container.State.Waiting.Message) + for _, c := range append(pod.Status.InitContainerStatuses, pod.Status.ContainerStatuses...) { + if c.ContainerID == "" { + if c.State.Waiting != nil && c.State.Waiting.Message != "" { + color.Red.Fprintln(a.output, c.State.Waiting.Message) } continue } - if container.State.Terminated != nil { - color.Purple.Fprintln(a.output, container.State.Terminated.Message) - continue - } - - if !a.trackedContainers.add(container.ContainerID) { - go a.streamContainerLogs(cancelCtx, pod, container) + if !a.trackedContainers.add(c.ContainerID) { + go a.streamContainerLogs(cancelCtx, pod, c) } } } @@ -151,21 +144,28 @@ func (a *LogAggregator) streamContainerLogs(ctx context.Context, pod *v1.Pod, co // In theory, it's more precise to use --since-time='' but there can be a time // difference between the user's machine and the server. // So we use --since=Xs and round up to the nearest second to not lose any log. - sinceSeconds := fmt.Sprintf("--since=%ds", sinceSeconds(time.Since(a.startTime))) + sinceSeconds := fmt.Sprintf("--since=%ds", sinceSeconds(time.Since(a.sinceTime))) tr, tw := io.Pipe() cmd := exec.CommandContext(ctx, "kubectl", "logs", sinceSeconds, "-f", pod.Name, "-c", container.Name, "--namespace", pod.Namespace) cmd.Stdout = tw - go util.RunCmd(cmd) - - color := a.colorPicker.Pick(pod) - prefix := prefix(pod, container) go func() { - if err := a.streamRequest(ctx, color, prefix, tr); err != nil { - logrus.Errorf("streaming request %s", err) - } - a.trackedContainers.remove(container.ContainerID) + util.RunCmd(cmd) + tw.Close() }() + + headerColor := a.colorPicker.Pick(pod) + prefix := prefix(pod, container) + if err := a.streamRequest(ctx, headerColor, prefix, tr); err != nil { + logrus.Errorf("streaming request %s", err) + } +} + +func (a *LogAggregator) printLogLine(headerColor color.Color, prefix, text string) { + if !a.IsMuted() { + headerColor.Fprintf(a.output, "%s ", prefix) + fmt.Fprint(a.output, text) + } } func prefix(pod *v1.Pod, container v1.ContainerStatus) string { @@ -175,38 +175,27 @@ func prefix(pod *v1.Pod, container v1.ContainerStatus) string { return fmt.Sprintf("[%s]", container.Name) } -func (a *LogAggregator) streamRequest(ctx context.Context, headerColor color.Color, header string, rc io.Reader) error { +func (a *LogAggregator) streamRequest(ctx context.Context, headerColor color.Color, prefix string, rc io.Reader) error { r := bufio.NewReader(rc) for { select { case <-ctx.Done(): - logrus.Infof("%s interrupted", header) + logrus.Infof("%s interrupted", prefix) return nil default: - } - - // Read up to newline - line, err := r.ReadBytes('\n') - if err == io.EOF { - break - } - if err != nil { - return errors.Wrap(err, "reading bytes from log stream") - } - - if a.IsMuted() { - continue - } + // Read up to newline + line, err := r.ReadString('\n') + if err == io.EOF { + a.printLogLine(headerColor, prefix, "\n") + return nil + } + if err != nil { + return errors.Wrap(err, "reading bytes from log stream") + } - if _, err := headerColor.Fprintf(a.output, "%s ", header); err != nil { - return errors.Wrap(err, "writing pod prefix header to out") - } - if _, err := fmt.Fprint(a.output, string(line)); err != nil { - return errors.Wrap(err, "writing pod log to out") + a.printLogLine(headerColor, prefix, line) } } - logrus.Infof("%s exited", header) - return nil } // Mute mutes the logs. @@ -240,12 +229,6 @@ func (t *trackedContainers) add(id string) bool { return alreadyTracked } -func (t *trackedContainers) remove(id string) { - t.Lock() - delete(t.ids, id) - t.Unlock() -} - // PodSelector is used to choose which pods to log. type PodSelector interface { Select(pod *v1.Pod) bool @@ -271,19 +254,12 @@ func (l *ImageList) Add(image string) { l.Unlock() } -// Remove removes an image from the list. -func (l *ImageList) Remove(image string) { - l.Lock() - delete(l.names, image) - l.Unlock() -} - // Select returns true if one of the pod's images is in the list. func (l *ImageList) Select(pod *v1.Pod) bool { l.RLock() defer l.RUnlock() - for _, container := range pod.Spec.Containers { + for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) { if l.names[container.Image] { return true } diff --git a/pkg/skaffold/kubernetes/log_test.go b/pkg/skaffold/kubernetes/log_test.go index 7d0ddcf2931..8ecae0e6c12 100644 --- a/pkg/skaffold/kubernetes/log_test.go +++ b/pkg/skaffold/kubernetes/log_test.go @@ -21,6 +21,7 @@ import ( "time" "github.com/GoogleContainerTools/skaffold/testutil" + v1 "k8s.io/api/core/v1" ) func TestSinceSeconds(t *testing.T) { @@ -49,3 +50,41 @@ func TestSinceSeconds(t *testing.T) { }) } } + +func TestSelect(t *testing.T) { + var tests = []struct { + description string + images []string + podSpec v1.PodSpec + expectedMatch bool + }{ + { + description: "match container", + podSpec: v1.PodSpec{Containers: []v1.Container{{Image: "image1"}}}, + expectedMatch: true, + }, + { + description: "match init container", + podSpec: v1.PodSpec{InitContainers: []v1.Container{{Image: "image2"}}}, + expectedMatch: true, + }, + { + description: "no match", + podSpec: v1.PodSpec{Containers: []v1.Container{{Image: "image3"}}}, + expectedMatch: false, + }, + } + for _, test := range tests { + testutil.Run(t, test.description, func(t *testutil.T) { + list := NewImageList() + list.Add("image1") + list.Add("image2") + + selected := list.Select(&v1.Pod{ + Spec: test.podSpec, + }) + + t.CheckDeepEqual(test.expectedMatch, selected) + }) + } +} diff --git a/pkg/skaffold/runner/build_deploy.go b/pkg/skaffold/runner/build_deploy.go index 5c718e77c91..430b21fb91f 100644 --- a/pkg/skaffold/runner/build_deploy.go +++ b/pkg/skaffold/runner/build_deploy.go @@ -19,6 +19,7 @@ package runner import ( "context" "io" + "time" "github.com/GoogleContainerTools/skaffold/pkg/skaffold/build" "github.com/GoogleContainerTools/skaffold/pkg/skaffold/schema/latest" @@ -79,14 +80,18 @@ func (r *SkaffoldRunner) DeployAndLog(ctx context.Context, out io.Writer, artifa logger := r.newLoggerForImages(out, imageNames) defer logger.Stop() - if err := logger.Start(ctx); err != nil { - return errors.Wrap(err, "starting logger") - } + // Logs should be retrieve up to just before the deploy + logger.SetSince(time.Now()) if err := r.Deploy(ctx, out, artifacts); err != nil { return err } + // Start printing the logs after deploy is finished + if err := logger.Start(ctx); err != nil { + return errors.Wrap(err, "starting logger") + } + <-ctx.Done() return nil diff --git a/pkg/skaffold/runner/dev.go b/pkg/skaffold/runner/dev.go index 730ddd02ef1..e07bfa0a106 100644 --- a/pkg/skaffold/runner/dev.go +++ b/pkg/skaffold/runner/dev.go @@ -19,6 +19,7 @@ package runner import ( "context" "io" + "time" "github.com/GoogleContainerTools/skaffold/pkg/skaffold/color" "github.com/GoogleContainerTools/skaffold/pkg/skaffold/kubernetes/portforward" @@ -133,18 +134,21 @@ func (r *SkaffoldRunner) Dev(ctx context.Context, out io.Writer, artifacts []*la return errors.Wrap(err, "exiting dev mode because first build failed") } - // Start logs - if r.runCtx.Opts.TailDev { - if err := logger.Start(ctx); err != nil { - return errors.Wrap(err, "starting logger") - } - } + // Logs should be retrieve up to just before the deploy + logger.SetSince(time.Now()) // First deploy if err := r.Deploy(ctx, out, r.builds); err != nil { return errors.Wrap(err, "exiting dev mode because first deploy failed") } + // Start printing the logs after deploy is finished + if r.runCtx.Opts.TailDev { + if err := logger.Start(ctx); err != nil { + return errors.Wrap(err, "starting logger") + } + } + // Forward ports if err := forwarderManager.Start(ctx); err != nil { return errors.Wrap(err, "starting forwarder manager")