Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve the logs #2323

Merged
merged 8 commits into from
Jun 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 42 additions & 66 deletions pkg/skaffold/kubernetes/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,12 @@ import (
"sync/atomic"
"time"

"github.com/GoogleContainerTools/skaffold/pkg/skaffold/color"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/util"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/watch"

"github.com/GoogleContainerTools/skaffold/pkg/skaffold/color"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/util"
)

// Client is for tests
Expand All @@ -47,7 +46,7 @@ type LogAggregator struct {
colorPicker ColorPicker

muted int32
startTime time.Time
sinceTime time.Time
cancel context.CancelFunc
trackedContainers trackedContainers
}
Expand All @@ -65,12 +64,15 @@ func NewLogAggregator(out io.Writer, baseImageNames []string, podSelector PodSel
}
}

func (a *LogAggregator) SetSince(t time.Time) {
a.sinceTime = t
}

// Start starts a logger that listens to pods and tail their logs
// if they are matched by the `podSelector`.
func (a *LogAggregator) Start(ctx context.Context) error {
cancelCtx, cancel := context.WithCancel(ctx)
a.cancel = cancel
a.startTime = time.Now()

aggregate := make(chan watch.Event)
stopWatchers, err := AggregatePodWatcher(a.namespaces, aggregate)
Expand All @@ -91,10 +93,6 @@ func (a *LogAggregator) Start(ctx context.Context) error {
return
}

if evt.Type != watch.Added && evt.Type != watch.Modified {
continue
}

pod, ok := evt.Object.(*v1.Pod)
if !ok {
continue
Expand All @@ -104,21 +102,16 @@ func (a *LogAggregator) Start(ctx context.Context) error {
continue
}

for _, container := range append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...) {
if container.ContainerID == "" {
if container.State.Waiting != nil && container.State.Waiting.Message != "" {
color.Red.Fprintln(a.output, container.State.Waiting.Message)
for _, c := range append(pod.Status.InitContainerStatuses, pod.Status.ContainerStatuses...) {
if c.ContainerID == "" {
if c.State.Waiting != nil && c.State.Waiting.Message != "" {
color.Red.Fprintln(a.output, c.State.Waiting.Message)
}
continue
}

if container.State.Terminated != nil {
color.Purple.Fprintln(a.output, container.State.Terminated.Message)
continue
}

if !a.trackedContainers.add(container.ContainerID) {
go a.streamContainerLogs(cancelCtx, pod, container)
if !a.trackedContainers.add(c.ContainerID) {
go a.streamContainerLogs(cancelCtx, pod, c)
}
}
}
Expand Down Expand Up @@ -151,21 +144,28 @@ func (a *LogAggregator) streamContainerLogs(ctx context.Context, pod *v1.Pod, co
// In theory, it's more precise to use --since-time='' but there can be a time
// difference between the user's machine and the server.
// So we use --since=Xs and round up to the nearest second to not lose any log.
sinceSeconds := fmt.Sprintf("--since=%ds", sinceSeconds(time.Since(a.startTime)))
sinceSeconds := fmt.Sprintf("--since=%ds", sinceSeconds(time.Since(a.sinceTime)))

tr, tw := io.Pipe()
cmd := exec.CommandContext(ctx, "kubectl", "logs", sinceSeconds, "-f", pod.Name, "-c", container.Name, "--namespace", pod.Namespace)
cmd.Stdout = tw
go util.RunCmd(cmd)

color := a.colorPicker.Pick(pod)
prefix := prefix(pod, container)
go func() {
if err := a.streamRequest(ctx, color, prefix, tr); err != nil {
logrus.Errorf("streaming request %s", err)
}
a.trackedContainers.remove(container.ContainerID)
util.RunCmd(cmd)
tw.Close()
}()

headerColor := a.colorPicker.Pick(pod)
prefix := prefix(pod, container)
if err := a.streamRequest(ctx, headerColor, prefix, tr); err != nil {
logrus.Errorf("streaming request %s", err)
}
}

func (a *LogAggregator) printLogLine(headerColor color.Color, prefix, text string) {
if !a.IsMuted() {
headerColor.Fprintf(a.output, "%s ", prefix)
fmt.Fprint(a.output, text)
}
}

func prefix(pod *v1.Pod, container v1.ContainerStatus) string {
Expand All @@ -175,38 +175,27 @@ func prefix(pod *v1.Pod, container v1.ContainerStatus) string {
return fmt.Sprintf("[%s]", container.Name)
}

func (a *LogAggregator) streamRequest(ctx context.Context, headerColor color.Color, header string, rc io.Reader) error {
func (a *LogAggregator) streamRequest(ctx context.Context, headerColor color.Color, prefix string, rc io.Reader) error {
r := bufio.NewReader(rc)
for {
select {
case <-ctx.Done():
logrus.Infof("%s interrupted", header)
logrus.Infof("%s interrupted", prefix)
return nil
default:
}

// Read up to newline
line, err := r.ReadBytes('\n')
if err == io.EOF {
break
}
if err != nil {
return errors.Wrap(err, "reading bytes from log stream")
}

if a.IsMuted() {
continue
}
// Read up to newline
line, err := r.ReadString('\n')
if err == io.EOF {
a.printLogLine(headerColor, prefix, "<Exited>\n")
return nil
}
if err != nil {
return errors.Wrap(err, "reading bytes from log stream")
}

if _, err := headerColor.Fprintf(a.output, "%s ", header); err != nil {
return errors.Wrap(err, "writing pod prefix header to out")
}
if _, err := fmt.Fprint(a.output, string(line)); err != nil {
return errors.Wrap(err, "writing pod log to out")
a.printLogLine(headerColor, prefix, line)
}
}
logrus.Infof("%s exited", header)
return nil
}

// Mute mutes the logs.
Expand Down Expand Up @@ -240,12 +229,6 @@ func (t *trackedContainers) add(id string) bool {
return alreadyTracked
}

func (t *trackedContainers) remove(id string) {
t.Lock()
delete(t.ids, id)
t.Unlock()
}

// PodSelector is used to choose which pods to log.
type PodSelector interface {
Select(pod *v1.Pod) bool
Expand All @@ -271,19 +254,12 @@ func (l *ImageList) Add(image string) {
l.Unlock()
}

// Remove removes an image from the list.
func (l *ImageList) Remove(image string) {
l.Lock()
delete(l.names, image)
l.Unlock()
}

// Select returns true if one of the pod's images is in the list.
func (l *ImageList) Select(pod *v1.Pod) bool {
l.RLock()
defer l.RUnlock()

for _, container := range pod.Spec.Containers {
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
if l.names[container.Image] {
return true
}
Expand Down
39 changes: 39 additions & 0 deletions pkg/skaffold/kubernetes/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/GoogleContainerTools/skaffold/testutil"
v1 "k8s.io/api/core/v1"
)

func TestSinceSeconds(t *testing.T) {
Expand Down Expand Up @@ -49,3 +50,41 @@ func TestSinceSeconds(t *testing.T) {
})
}
}

func TestSelect(t *testing.T) {
var tests = []struct {
description string
images []string
podSpec v1.PodSpec
expectedMatch bool
}{
{
description: "match container",
podSpec: v1.PodSpec{Containers: []v1.Container{{Image: "image1"}}},
expectedMatch: true,
},
{
description: "match init container",
podSpec: v1.PodSpec{InitContainers: []v1.Container{{Image: "image2"}}},
expectedMatch: true,
},
{
description: "no match",
podSpec: v1.PodSpec{Containers: []v1.Container{{Image: "image3"}}},
expectedMatch: false,
},
}
for _, test := range tests {
testutil.Run(t, test.description, func(t *testutil.T) {
list := NewImageList()
list.Add("image1")
list.Add("image2")

selected := list.Select(&v1.Pod{
Spec: test.podSpec,
})

t.CheckDeepEqual(test.expectedMatch, selected)
})
}
}
11 changes: 8 additions & 3 deletions pkg/skaffold/runner/build_deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package runner
import (
"context"
"io"
"time"

"github.com/GoogleContainerTools/skaffold/pkg/skaffold/build"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/schema/latest"
Expand Down Expand Up @@ -79,14 +80,18 @@ func (r *SkaffoldRunner) DeployAndLog(ctx context.Context, out io.Writer, artifa
logger := r.newLoggerForImages(out, imageNames)
defer logger.Stop()

if err := logger.Start(ctx); err != nil {
return errors.Wrap(err, "starting logger")
}
// Logs should be retrieve up to just before the deploy
logger.SetSince(time.Now())

if err := r.Deploy(ctx, out, artifacts); err != nil {
return err
}

// Start printing the logs after deploy is finished
if err := logger.Start(ctx); err != nil {
return errors.Wrap(err, "starting logger")
}

<-ctx.Done()

return nil
Expand Down
16 changes: 10 additions & 6 deletions pkg/skaffold/runner/dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package runner
import (
"context"
"io"
"time"

"github.com/GoogleContainerTools/skaffold/pkg/skaffold/color"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/kubernetes/portforward"
Expand Down Expand Up @@ -133,18 +134,21 @@ func (r *SkaffoldRunner) Dev(ctx context.Context, out io.Writer, artifacts []*la
return errors.Wrap(err, "exiting dev mode because first build failed")
}

// Start logs
if r.runCtx.Opts.TailDev {
if err := logger.Start(ctx); err != nil {
return errors.Wrap(err, "starting logger")
}
}
// Logs should be retrieve up to just before the deploy
logger.SetSince(time.Now())

// First deploy
if err := r.Deploy(ctx, out, r.builds); err != nil {
return errors.Wrap(err, "exiting dev mode because first deploy failed")
}

// Start printing the logs after deploy is finished
if r.runCtx.Opts.TailDev {
if err := logger.Start(ctx); err != nil {
return errors.Wrap(err, "starting logger")
}
}

// Forward ports
if err := forwarderManager.Start(ctx); err != nil {
return errors.Wrap(err, "starting forwarder manager")
Expand Down