diff --git a/Makefile b/Makefile index 84e1667b213d..55e427234655 100644 --- a/Makefile +++ b/Makefile @@ -316,6 +316,7 @@ test-results/junit.xml: $(GOPATH)/bin/go-junit-report test-results/test.out cat test-results/test.out | go-junit-report > test-results/junit.xml dist/$(PROFILE).yaml: $(MANIFESTS) $(E2E_MANIFESTS) + mkdir -p dist kustomize build --load_restrictor=none test/e2e/manifests/$(PROFILE) | sed 's/:latest/:$(VERSION)/' | sed 's/pns/$(E2E_EXECUTOR)/' > dist/$(PROFILE).yaml .PHONY: install diff --git a/server/workflow/workflow_server.go b/server/workflow/workflow_server.go index 017d3e34859e..2aaa20c0c8f6 100644 --- a/server/workflow/workflow_server.go +++ b/server/workflow/workflow_server.go @@ -3,12 +3,8 @@ package workflow import ( "encoding/json" "fmt" - "reflect" "sort" - "github.com/argoproj/argo/pkg/client/clientset/versioned" - "github.com/argoproj/argo/workflow/creator" - log "github.com/sirupsen/logrus" "golang.org/x/net/context" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -17,12 +13,14 @@ import ( "github.com/argoproj/argo/persist/sqldb" workflowpkg "github.com/argoproj/argo/pkg/apiclient/workflow" "github.com/argoproj/argo/pkg/apis/workflow" - "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" + wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" + "github.com/argoproj/argo/pkg/client/clientset/versioned" "github.com/argoproj/argo/server/auth" argoutil "github.com/argoproj/argo/util" "github.com/argoproj/argo/util/instanceid" "github.com/argoproj/argo/util/logs" "github.com/argoproj/argo/workflow/common" + "github.com/argoproj/argo/workflow/creator" "github.com/argoproj/argo/workflow/hydrator" "github.com/argoproj/argo/workflow/templateresolution" "github.com/argoproj/argo/workflow/util" @@ -42,7 +40,7 @@ func NewWorkflowServer(instanceIDService instanceid.Service, offloadNodeStatusRe return &workflowServer{instanceIDService, offloadNodeStatusRepo, hydrator.New(offloadNodeStatusRepo)} } -func (s *workflowServer) CreateWorkflow(ctx context.Context, req *workflowpkg.WorkflowCreateRequest) (*v1alpha1.Workflow, error) { +func (s *workflowServer) CreateWorkflow(ctx context.Context, req *workflowpkg.WorkflowCreateRequest) (*wfv1.Workflow, error) { wfClient := auth.GetWfClient(ctx) if req.Workflow == nil { @@ -83,7 +81,7 @@ func (s *workflowServer) CreateWorkflow(ctx context.Context, req *workflowpkg.Wo return wf, nil } -func (s *workflowServer) GetWorkflow(ctx context.Context, req *workflowpkg.WorkflowGetRequest) (*v1alpha1.Workflow, error) { +func (s *workflowServer) GetWorkflow(ctx context.Context, req *workflowpkg.WorkflowGetRequest) (*wfv1.Workflow, error) { wfGetOption := metav1.GetOptions{} if req.GetOptions != nil { wfGetOption = *req.GetOptions @@ -104,7 +102,7 @@ func (s *workflowServer) GetWorkflow(ctx context.Context, req *workflowpkg.Workf return wf, err } -func (s *workflowServer) ListWorkflows(ctx context.Context, req *workflowpkg.WorkflowListRequest) (*v1alpha1.WorkflowList, error) { +func (s *workflowServer) ListWorkflows(ctx context.Context, req *workflowpkg.WorkflowListRequest) (*wfv1.WorkflowList, error) { wfClient := auth.GetWfClient(ctx) var listOption = &metav1.ListOptions{} @@ -135,7 +133,7 @@ func (s *workflowServer) ListWorkflows(ctx context.Context, req *workflowpkg.Wor // we make no promises about the overall list sorting, we just sort each page sort.Sort(wfList.Items) - return &v1alpha1.WorkflowList{ListMeta: metav1.ListMeta{Continue: wfList.Continue, ResourceVersion: wfList.ResourceVersion}, Items: wfList.Items}, nil + return &wfv1.WorkflowList{ListMeta: metav1.ListMeta{Continue: wfList.Continue, ResourceVersion: wfList.ResourceVersion}, Items: wfList.Items}, nil } func (s *workflowServer) WatchWorkflows(req *workflowpkg.WatchWorkflowsRequest, ws workflowpkg.WorkflowService_WatchWorkflowsServer) error { @@ -171,9 +169,14 @@ func (s *workflowServer) WatchWorkflows(req *workflowpkg.WatchWorkflowsRequest, select { case <-ctx.Done(): return nil - case event, open := <-watch.ResultChan(): - if !open { + case event, ok := <-watch.ResultChan(): + var wf *wfv1.Workflow + if ok { + wf, ok = event.Object.(*wfv1.Workflow) + } + if !ok { log.Debug("Re-establishing workflow watch") + watch.Stop() watch, err = wfIf.Watch(*opts) if err != nil { return err @@ -181,10 +184,6 @@ func (s *workflowServer) WatchWorkflows(req *workflowpkg.WatchWorkflowsRequest, continue } log.Debug("Received event") - wf, ok := event.Object.(*v1alpha1.Workflow) - if !ok { - return fmt.Errorf("watch object was not a workflow %v", reflect.TypeOf(event.Object)) - } logCtx := log.WithFields(log.Fields{"workflow": wf.Name, "type": event.Type, "phase": wf.Status.Phase}) err := s.hydrator.Hydrate(wf) if err != nil { @@ -195,6 +194,8 @@ func (s *workflowServer) WatchWorkflows(req *workflowpkg.WatchWorkflowsRequest, if err != nil { return err } + // when we re-establish, we want to start at the same place + opts.ResourceVersion = wf.ResourceVersion } } } @@ -216,7 +217,7 @@ func (s *workflowServer) DeleteWorkflow(ctx context.Context, req *workflowpkg.Wo return &workflowpkg.WorkflowDeleteResponse{}, nil } -func (s *workflowServer) RetryWorkflow(ctx context.Context, req *workflowpkg.WorkflowRetryRequest) (*v1alpha1.Workflow, error) { +func (s *workflowServer) RetryWorkflow(ctx context.Context, req *workflowpkg.WorkflowRetryRequest) (*wfv1.Workflow, error) { wfClient := auth.GetWfClient(ctx) kubeClient := auth.GetKubeClient(ctx) @@ -237,7 +238,7 @@ func (s *workflowServer) RetryWorkflow(ctx context.Context, req *workflowpkg.Wor return wf, nil } -func (s *workflowServer) ResubmitWorkflow(ctx context.Context, req *workflowpkg.WorkflowResubmitRequest) (*v1alpha1.Workflow, error) { +func (s *workflowServer) ResubmitWorkflow(ctx context.Context, req *workflowpkg.WorkflowResubmitRequest) (*wfv1.Workflow, error) { wfClient := auth.GetWfClient(ctx) wf, err := s.getWorkflow(wfClient, req.Namespace, req.Name, metav1.GetOptions{}) if err != nil { @@ -254,14 +255,14 @@ func (s *workflowServer) ResubmitWorkflow(ctx context.Context, req *workflowpkg. return nil, err } - created, err := util.SubmitWorkflow(wfClient.ArgoprojV1alpha1().Workflows(req.Namespace), wfClient, req.Namespace, newWF, &v1alpha1.SubmitOpts{}) + created, err := util.SubmitWorkflow(wfClient.ArgoprojV1alpha1().Workflows(req.Namespace), wfClient, req.Namespace, newWF, &wfv1.SubmitOpts{}) if err != nil { return nil, err } return created, nil } -func (s *workflowServer) ResumeWorkflow(ctx context.Context, req *workflowpkg.WorkflowResumeRequest) (*v1alpha1.Workflow, error) { +func (s *workflowServer) ResumeWorkflow(ctx context.Context, req *workflowpkg.WorkflowResumeRequest) (*wfv1.Workflow, error) { wfClient := auth.GetWfClient(ctx) wf, err := s.getWorkflow(wfClient, req.Namespace, req.Name, metav1.GetOptions{}) if err != nil { @@ -287,7 +288,7 @@ func (s *workflowServer) ResumeWorkflow(ctx context.Context, req *workflowpkg.Wo return wf, nil } -func (s *workflowServer) SuspendWorkflow(ctx context.Context, req *workflowpkg.WorkflowSuspendRequest) (*v1alpha1.Workflow, error) { +func (s *workflowServer) SuspendWorkflow(ctx context.Context, req *workflowpkg.WorkflowSuspendRequest) (*wfv1.Workflow, error) { wfClient := auth.GetWfClient(ctx) wf, err := s.getWorkflow(wfClient, req.Namespace, req.Name, metav1.GetOptions{}) @@ -313,7 +314,7 @@ func (s *workflowServer) SuspendWorkflow(ctx context.Context, req *workflowpkg.W return wf, nil } -func (s *workflowServer) TerminateWorkflow(ctx context.Context, req *workflowpkg.WorkflowTerminateRequest) (*v1alpha1.Workflow, error) { +func (s *workflowServer) TerminateWorkflow(ctx context.Context, req *workflowpkg.WorkflowTerminateRequest) (*wfv1.Workflow, error) { wfClient := auth.GetWfClient(ctx) wf, err := s.getWorkflow(wfClient, req.Namespace, req.Name, metav1.GetOptions{}) @@ -338,7 +339,7 @@ func (s *workflowServer) TerminateWorkflow(ctx context.Context, req *workflowpkg return wf, nil } -func (s *workflowServer) StopWorkflow(ctx context.Context, req *workflowpkg.WorkflowStopRequest) (*v1alpha1.Workflow, error) { +func (s *workflowServer) StopWorkflow(ctx context.Context, req *workflowpkg.WorkflowStopRequest) (*wfv1.Workflow, error) { wfClient := auth.GetWfClient(ctx) wf, err := s.getWorkflow(wfClient, req.Namespace, req.Name, metav1.GetOptions{}) if err != nil { @@ -360,7 +361,7 @@ func (s *workflowServer) StopWorkflow(ctx context.Context, req *workflowpkg.Work return wf, nil } -func (s *workflowServer) SetWorkflow(ctx context.Context, req *workflowpkg.WorkflowSetRequest) (*v1alpha1.Workflow, error) { +func (s *workflowServer) SetWorkflow(ctx context.Context, req *workflowpkg.WorkflowSetRequest) (*wfv1.Workflow, error) { wfClient := auth.GetWfClient(ctx) wf, err := s.getWorkflow(wfClient, req.Namespace, req.Name, metav1.GetOptions{}) if err != nil { @@ -371,9 +372,9 @@ func (s *workflowServer) SetWorkflow(ctx context.Context, req *workflowpkg.Workf return nil, err } - phaseToSet := v1alpha1.NodePhase(req.Phase) + phaseToSet := wfv1.NodePhase(req.Phase) switch phaseToSet { - case v1alpha1.NodeSucceeded, v1alpha1.NodeFailed, v1alpha1.NodeError, "": + case wfv1.NodeSucceeded, wfv1.NodeFailed, wfv1.NodeError, "": // Do nothing, passes validation default: return nil, fmt.Errorf("%s is an invalid phase to set to", req.Phase) @@ -405,7 +406,7 @@ func (s *workflowServer) SetWorkflow(ctx context.Context, req *workflowpkg.Workf return wf, nil } -func (s *workflowServer) LintWorkflow(ctx context.Context, req *workflowpkg.WorkflowLintRequest) (*v1alpha1.Workflow, error) { +func (s *workflowServer) LintWorkflow(ctx context.Context, req *workflowpkg.WorkflowLintRequest) (*wfv1.Workflow, error) { wfClient := auth.GetWfClient(ctx) wftmplGetter := templateresolution.WrapWorkflowTemplateInterface(wfClient.ArgoprojV1alpha1().WorkflowTemplates(req.Namespace)) cwftmplGetter := templateresolution.WrapClusterWorkflowTemplateInterface(wfClient.ArgoprojV1alpha1().ClusterWorkflowTemplates()) @@ -437,7 +438,7 @@ func (s *workflowServer) PodLogs(req *workflowpkg.WorkflowLogRequest, ws workflo return logs.WorkflowLogs(ctx, wfClient, kubeClient, req, ws) } -func (s *workflowServer) getWorkflow(wfClient versioned.Interface, namespace string, name string, options metav1.GetOptions) (*v1alpha1.Workflow, error) { +func (s *workflowServer) getWorkflow(wfClient versioned.Interface, namespace string, name string, options metav1.GetOptions) (*wfv1.Workflow, error) { if name == latestAlias { latest, err := getLatestWorkflow(wfClient, namespace) if err != nil { @@ -453,11 +454,11 @@ func (s *workflowServer) getWorkflow(wfClient versioned.Interface, namespace str return wf, nil } -func (s *workflowServer) validateWorkflow(wf *v1alpha1.Workflow) error { +func (s *workflowServer) validateWorkflow(wf *wfv1.Workflow) error { return s.instanceIDService.Validate(wf) } -func getLatestWorkflow(wfClient versioned.Interface, namespace string) (*v1alpha1.Workflow, error) { +func getLatestWorkflow(wfClient versioned.Interface, namespace string) (*wfv1.Workflow, error) { wfList, err := wfClient.ArgoprojV1alpha1().Workflows(namespace).List(metav1.ListOptions{}) if err != nil { return nil, err @@ -474,9 +475,9 @@ func getLatestWorkflow(wfClient versioned.Interface, namespace string) (*v1alpha return &latest, nil } -func (s *workflowServer) SubmitWorkflow(ctx context.Context, req *workflowpkg.WorkflowSubmitRequest) (*v1alpha1.Workflow, error) { +func (s *workflowServer) SubmitWorkflow(ctx context.Context, req *workflowpkg.WorkflowSubmitRequest) (*wfv1.Workflow, error) { wfClient := auth.GetWfClient(ctx) - var wf *v1alpha1.Workflow + var wf *wfv1.Workflow switch req.ResourceKind { case workflow.CronWorkflowKind, workflow.CronWorkflowSingular, workflow.CronWorkflowPlural, workflow.CronWorkflowShortName: cronWf, err := wfClient.ArgoprojV1alpha1().CronWorkflows(req.Namespace).Get(req.ResourceName, metav1.GetOptions{}) diff --git a/util/logs/workflow-logger.go b/util/logs/workflow-logger.go index 49ab4dfaae04..30215e8a3c4a 100644 --- a/util/logs/workflow-logger.go +++ b/util/logs/workflow-logger.go @@ -3,7 +3,6 @@ package logs import ( "bufio" "context" - "reflect" "sort" "strings" "sync" @@ -164,9 +163,14 @@ func WorkflowLogs(ctx context.Context, wfClient versioned.Interface, kubeClient select { case <-ctx.Done(): return - case event, open := <-wfWatch.ResultChan(): - if !open { + case event, ok := <-wfWatch.ResultChan(): + var wf *wfv1.Workflow + if ok { + wf, ok = event.Object.(*wfv1.Workflow) + } + if !ok { logCtx.Debug("Re-establishing workflow watch") + wfWatch.Stop() wfWatch, err = wfInterface.Watch(wfListOptions) if err != nil { logCtx.Error(err) @@ -174,15 +178,12 @@ func WorkflowLogs(ctx context.Context, wfClient versioned.Interface, kubeClient } continue } - wf, ok := event.Object.(*wfv1.Workflow) - if !ok { - logCtx.Errorf("watch object was not a workflow %v", reflect.TypeOf(event.Object)) - return - } logCtx.WithFields(log.Fields{"eventType": event.Type, "completed": wf.Status.Fulfilled()}).Debug("Workflow event") if event.Type == watch.Deleted || wf.Status.Fulfilled() { return } + // in case we re-establish the watch, make sure we start at the same place + wfListOptions.ResourceVersion = wf.ResourceVersion } } }() @@ -197,9 +198,14 @@ func WorkflowLogs(ctx context.Context, wfClient versioned.Interface, kubeClient select { case <-stopWatchingPods: return - case event, open := <-podWatch.ResultChan(): - if !open { + case event, ok := <-podWatch.ResultChan(): + var pod *corev1.Pod + if ok { + pod, ok = event.Object.(*corev1.Pod) + } + if !ok { logCtx.Info("Re-establishing pod watch") + podWatch.Stop() podWatch, err = podInterface.Watch(podListOptions) if err != nil { logCtx.Error(err) @@ -207,15 +213,11 @@ func WorkflowLogs(ctx context.Context, wfClient versioned.Interface, kubeClient } continue } - pod, ok := event.Object.(*corev1.Pod) - if !ok { - logCtx.Errorf("watch object was not a pod %v", reflect.TypeOf(event.Object)) - return - } logCtx.WithFields(log.Fields{"eventType": event.Type, "podName": pod.GetName(), "phase": pod.Status.Phase}).Debug("Pod event") if pod.Status.Phase == corev1.PodRunning { ensureWeAreStreaming(pod) } + podListOptions.ResourceVersion = pod.ResourceVersion } } }()