Skip to content

Commit

Permalink
Issue #1104 - Remove container wait timeout from 'argo logs --follow' (
Browse files Browse the repository at this point in the history
  • Loading branch information
alexmt authored Dec 28, 2018
1 parent 0f84e51 commit e09d9ad
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 68 deletions.
152 changes: 85 additions & 67 deletions cmd/argo/commands/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,32 @@ package commands

import (
"bufio"
"context"
"fmt"
"hash/fnv"

"math"
"os"
"strconv"
"strings"
"sync"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
pkgwatch "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/watch"

"github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
wfclientset "github.com/argoproj/argo/pkg/client/clientset/versioned"
wfinformers "github.com/argoproj/argo/pkg/client/informers/externalversions"
workflowv1 "github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
"github.com/argoproj/pkg/errors"

log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)

type logEntry struct {
Expand Down Expand Up @@ -101,8 +108,8 @@ func (p *logPrinter) PrintWorkflowLogs(workflow string) error {
return err
}
timeByPod := p.printRecentWorkflowLogs(wf)
if p.follow && wf.Status.Phase == v1alpha1.NodeRunning {
p.printLiveWorkflowLogs(wf, timeByPod)
if p.follow {
p.printLiveWorkflowLogs(wf.Name, wfClient, timeByPod)
}
return nil
}
Expand All @@ -114,7 +121,7 @@ func (p *logPrinter) PrintPodLogs(podName string) error {
return err
}
var logs []logEntry
err = p.getPodLogs("", podName, namespace, p.follow, p.tail, p.sinceSeconds, p.sinceTime, func(entry logEntry) {
err = p.getPodLogs(context.Background(), "", podName, namespace, p.follow, p.tail, p.sinceSeconds, p.sinceTime, func(entry logEntry) {
logs = append(logs, entry)
})
if err != nil {
Expand Down Expand Up @@ -144,7 +151,7 @@ func (p *logPrinter) printRecentWorkflowLogs(wf *v1alpha1.Workflow) map[string]*
go func() {
defer wg.Done()
var podLogs []logEntry
err := p.getPodLogs(getDisplayName(node), node.ID, wf.Namespace, false, p.tail, p.sinceSeconds, p.sinceTime, func(entry logEntry) {
err := p.getPodLogs(context.Background(), getDisplayName(node), node.ID, wf.Namespace, false, p.tail, p.sinceSeconds, p.sinceTime, func(entry logEntry) {
podLogs = append(podLogs, entry)
})

Expand Down Expand Up @@ -178,33 +185,12 @@ func (p *logPrinter) printRecentWorkflowLogs(wf *v1alpha1.Workflow) map[string]*
return timeByPod
}

func (p *logPrinter) setupWorkflowInformer(namespace string, name string, callback func(wf *v1alpha1.Workflow, done bool)) cache.SharedIndexInformer {
wfcClientset := wfclientset.NewForConfigOrDie(restConfig)
wfInformerFactory := wfinformers.NewFilteredSharedInformerFactory(wfcClientset, 20*time.Minute, namespace, nil)
informer := wfInformerFactory.Argoproj().V1alpha1().Workflows().Informer()
informer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
UpdateFunc: func(old, new interface{}) {
updatedWf := new.(*v1alpha1.Workflow)
if updatedWf.Name == name {
callback(updatedWf, updatedWf.Status.Phase != v1alpha1.NodeRunning)
}
},
DeleteFunc: func(obj interface{}) {
deletedWf := obj.(*v1alpha1.Workflow)
if deletedWf.Name == name {
callback(deletedWf, true)
}
},
},
)
return informer
}

// Prints live logs for workflow pods, starting from time specified in timeByPod name.
func (p *logPrinter) printLiveWorkflowLogs(workflow *v1alpha1.Workflow, timeByPod map[string]*time.Time) {
func (p *logPrinter) printLiveWorkflowLogs(workflowName string, wfClient workflowv1.WorkflowInterface, timeByPod map[string]*time.Time) {
logs := make(chan logEntry)
streamedPods := make(map[string]bool)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

processPods := func(wf *v1alpha1.Workflow) {
for id := range wf.Status.Nodes {
Expand All @@ -218,7 +204,7 @@ func (p *logPrinter) printLiveWorkflowLogs(workflow *v1alpha1.Workflow, timeByPo
sinceTime := metav1.NewTime(podTime.Add(time.Second))
sinceTimePtr = &sinceTime
}
err := p.getPodLogs(getDisplayName(node), node.ID, wf.Namespace, true, nil, nil, sinceTimePtr, func(entry logEntry) {
err := p.getPodLogs(ctx, getDisplayName(node), node.ID, wf.Namespace, true, nil, nil, sinceTimePtr, func(entry logEntry) {
logs <- entry
})
if err != nil {
Expand All @@ -229,20 +215,31 @@ func (p *logPrinter) printLiveWorkflowLogs(workflow *v1alpha1.Workflow, timeByPo
}
}

processPods(workflow)
informer := p.setupWorkflowInformer(workflow.Namespace, workflow.Name, func(wf *v1alpha1.Workflow, done bool) {
if done {
close(logs)
} else {
processPods(wf)
}
})

stopChannel := make(chan struct{})
go func() {
informer.Run(stopChannel)
defer close(logs)
fieldSelector := fields.ParseSelectorOrDie(fmt.Sprintf("metadata.name=%s", workflowName))
listOpts := metav1.ListOptions{FieldSelector: fieldSelector.String()}
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return wfClient.List(listOpts)
},
WatchFunc: func(options metav1.ListOptions) (pkgwatch.Interface, error) {
return wfClient.Watch(listOpts)
},
}
_, err := watch.UntilWithSync(ctx, lw, &v1alpha1.Workflow{}, nil, func(event pkgwatch.Event) (b bool, e error) {
if wf, ok := event.Object.(*v1alpha1.Workflow); ok {
if !wf.Status.Completed() {
processPods(wf)
}
return wf.Status.Completed(), nil
}
return true, nil
})
if err != nil {
log.Fatal(err)
}
}()
defer close(stopChannel)

for entry := range logs {
p.printLogEntry(entry)
Expand Down Expand Up @@ -273,35 +270,56 @@ func (p *logPrinter) printLogEntry(entry logEntry) {
fmt.Println(line)
}

func (p *logPrinter) ensureContainerStarted(podName string, podNamespace string, container string, retryCnt int, retryTimeout time.Duration) error {
for retryCnt > 0 {
pod, err := p.kubeClient.CoreV1().Pods(podNamespace).Get(podName, metav1.GetOptions{})
func (p *logPrinter) hasContainerStarted(podName string, podNamespace string, container string) (bool, error) {
pod, err := p.kubeClient.CoreV1().Pods(podNamespace).Get(podName, metav1.GetOptions{})
if err != nil {
return false, err
}
var containerStatus *v1.ContainerStatus
for _, status := range pod.Status.ContainerStatuses {
if status.Name == container {
containerStatus = &status
break
}
}
if containerStatus == nil {
return false, nil
}

if containerStatus.State.Waiting != nil {
return false, nil
}
return true, nil
}

func (p *logPrinter) getPodLogs(
ctx context.Context,
displayName string,
podName string,
podNamespace string,
follow bool,
tail *int64,
sinceSeconds *int64,
sinceTime *metav1.Time,
callback func(entry logEntry)) error {

for ctx.Err() == nil {
hasStarted, err := p.hasContainerStarted(podName, podNamespace, p.container)

if err != nil {
return err
}
var containerStatus *v1.ContainerStatus
for _, status := range pod.Status.ContainerStatuses {
if status.Name == container {
containerStatus = &status
break
if !hasStarted {
if follow {
time.Sleep(1 * time.Second)
} else {
return nil
}
}
if containerStatus == nil || containerStatus.State.Waiting != nil {
time.Sleep(retryTimeout)
retryCnt--
} else {
return nil
break
}
}
return fmt.Errorf("container '%s' of pod '%s' has not started within expected timeout", container, podName)
}

func (p *logPrinter) getPodLogs(
displayName string, podName string, podNamespace string, follow bool, tail *int64, sinceSeconds *int64, sinceTime *metav1.Time, callback func(entry logEntry)) error {
err := p.ensureContainerStarted(podName, podNamespace, p.container, 10, time.Second)
if err != nil {
return err
}
stream, err := p.kubeClient.CoreV1().Pods(podNamespace).GetLogs(podName, &v1.PodLogOptions{
Container: p.container,
Follow: follow,
Expand Down
4 changes: 3 additions & 1 deletion workflow/executor/k8sapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"syscall"
"time"

"github.com/argoproj/argo/util"

"github.com/argoproj/argo/errors"
"github.com/argoproj/argo/workflow/common"
execcommon "github.com/argoproj/argo/workflow/executor/common"
Expand Down Expand Up @@ -100,7 +102,7 @@ func (c *k8sAPIClient) saveLogs(containerID, path string) error {
if err != nil {
return errors.InternalWrapError(err)
}
defer outFile.Close()
defer util.Close(outFile)
_, err = io.Copy(outFile, reader)
if err != nil {
return errors.InternalWrapError(err)
Expand Down

0 comments on commit e09d9ad

Please sign in to comment.