diff --git a/pkg/skaffold/kubernetes/log.go b/pkg/skaffold/kubernetes/log.go index 77cf30b916c..33c0560035a 100644 --- a/pkg/skaffold/kubernetes/log.go +++ b/pkg/skaffold/kubernetes/log.go @@ -21,6 +21,7 @@ import ( "context" "fmt" "io" + "os/exec" "sync" "sync/atomic" "time" @@ -84,6 +85,7 @@ func (a *LogAggregator) Start(ctx context.Context) error { for { select { case <-ctx.Done(): + watcher.Stop() return case evt, ok := <-watcher.ResultChan(): if !ok { @@ -101,7 +103,7 @@ func (a *LogAggregator) Start(ctx context.Context) error { } if a.podSelector.Select(pod) { - a.streamLogs(ctx, client, pod) + go a.streamLogs(ctx, client, pod) } } } @@ -112,11 +114,9 @@ func (a *LogAggregator) Start(ctx context.Context) error { } func (a *LogAggregator) streamLogs(ctx context.Context, client corev1.PodsGetter, pod *v1.Pod) error { - pods := client.Pods(pod.Namespace) - for _, container := range pod.Status.ContainerStatuses { containerID := container.ContainerID - if containerID == "" { + if containerID == "" || !container.Ready { continue } @@ -127,36 +127,26 @@ func (a *LogAggregator) streamLogs(ctx context.Context, client corev1.PodsGetter logrus.Infof("Stream logs from pod: %s container: %s", pod.Name, container.Name) - sinceSeconds := int64(time.Since(a.startTime).Seconds() + 0.5) - // 0s means all the logs - if sinceSeconds == 0 { - sinceSeconds = 1 - } - - req := pods.GetLogs(pod.Name, &v1.PodLogOptions{ - Follow: true, - Container: container.Name, - SinceSeconds: &sinceSeconds, - }) + tr, tw := io.Pipe() + go func() { + sinceSeconds := int64(time.Since(a.startTime).Seconds() + 0.5) + // 0s means all the logs + if sinceSeconds == 0 { + sinceSeconds = 1 + } - rc, err := req.Stream() - if err != nil { - a.trackedContainers.remove(containerID) - return errors.Wrap(err, "setting up container log stream") - } + cmd := exec.CommandContext(ctx, "kubectl", "logs", fmt.Sprintf("--since=%ds", sinceSeconds), "-f", pod.Name, "-c", container.Name) + cmd.Stdout = tw + cmd.Run() + }() color := a.colorPicker.Pick(pod) prefix := color.Sprint(prefix(pod, container)) - go func() { - defer func() { - a.trackedContainers.remove(containerID) - rc.Close() - }() - - if err := a.streamRequest(ctx, prefix, rc); err != nil { + if err := a.streamRequest(ctx, prefix, tr); err != nil { logrus.Errorf("streaming request %s", err) } + a.trackedContainers.remove(containerID) }() }