From 660bbb68f2e878700cb256898c68c75f00ee99d1 Mon Sep 17 00:00:00 2001 From: Yuan Tang Date: Tue, 11 Jul 2023 15:14:28 -0400 Subject: [PATCH] fix: Live workflow takes precedence during merge to correctly display in the UI (#11336) Signed-off-by: Yuan Tang --- server/workflow/workflow_server.go | 37 +++++++++++++++++++++++++ server/workflow/workflow_server_test.go | 21 ++++++++++++++ ui/src/models/workflows.ts | 4 +++ 3 files changed, 62 insertions(+) diff --git a/server/workflow/workflow_server.go b/server/workflow/workflow_server.go index 066ac274abbf..13f200edff90 100644 --- a/server/workflow/workflow_server.go +++ b/server/workflow/workflow_server.go @@ -19,6 +19,7 @@ import ( "github.com/argoproj/argo-workflows/v3/persist/sqldb" workflowpkg "github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflow" "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow" + "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned" "github.com/argoproj/argo-workflows/v3/server/auth" @@ -127,6 +128,42 @@ func (s *workflowServer) GetWorkflow(ctx context.Context, req *workflowpkg.Workf return wf, nil } +func mergeWithArchivedWorkflows(liveWfs v1alpha1.WorkflowList, archivedWfs v1alpha1.WorkflowList, numWfsToKeep int) *v1alpha1.WorkflowList { + var mergedWfs []v1alpha1.Workflow + var uidToWfs = map[types.UID][]v1alpha1.Workflow{} + for _, item := range liveWfs.Items { + uidToWfs[item.UID] = append(uidToWfs[item.UID], item) + } + for _, item := range archivedWfs.Items { + uidToWfs[item.UID] = append(uidToWfs[item.UID], item) + } + + for _, v := range uidToWfs { + // The archived workflow we saved in the database will only have "Pending" as the archival status. + // We want to only keep the workflow that has the correct label to display correctly in the UI. + if len(v) == 1 { + mergedWfs = append(mergedWfs, v[0]) + } else { + if ok := v[0].Labels[common.LabelKeyWorkflowArchivingStatus] == "Archived"; ok { + mergedWfs = append(mergedWfs, v[0]) + } else { + mergedWfs = append(mergedWfs, v[1]) + } + } + } + mergedWfsList := v1alpha1.WorkflowList{Items: mergedWfs, ListMeta: liveWfs.ListMeta} + sort.Sort(mergedWfsList.Items) + numWfs := 0 + var finalWfs []v1alpha1.Workflow + for _, item := range mergedWfsList.Items { + if numWfsToKeep == 0 || numWfs < numWfsToKeep { + finalWfs = append(finalWfs, item) + numWfs += 1 + } + } + return &v1alpha1.WorkflowList{Items: finalWfs, ListMeta: liveWfs.ListMeta} +} + func (s *workflowServer) ListWorkflows(ctx context.Context, req *workflowpkg.WorkflowListRequest) (*wfv1.WorkflowList, error) { wfClient := auth.GetWfClient(ctx) diff --git a/server/workflow/workflow_server_test.go b/server/workflow/workflow_server_test.go index 4cd92647ec40..e70c63843800 100644 --- a/server/workflow/workflow_server_test.go +++ b/server/workflow/workflow_server_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "testing" + "time" "github.com/go-jose/go-jose/v3/jwt" "github.com/stretchr/testify/assert" @@ -630,6 +631,26 @@ func (t testWatchWorkflowServer) Send(*workflowpkg.WorkflowWatchEvent) error { panic("implement me") } +func TestMergeWithArchivedWorkflows(t *testing.T) { + timeNow := time.Now() + wf1Live := v1alpha1.Workflow{ + ObjectMeta: metav1.ObjectMeta{UID: "1", CreationTimestamp: metav1.Time{Time: timeNow.Add(time.Second)}, + Labels: map[string]string{common.LabelKeyWorkflowArchivingStatus: "Archived"}}} + wf1Archived := v1alpha1.Workflow{ + ObjectMeta: metav1.ObjectMeta{UID: "1", CreationTimestamp: metav1.Time{Time: timeNow.Add(time.Second)}, + Labels: map[string]string{common.LabelKeyWorkflowArchivingStatus: "Pending"}}} + wf2 := v1alpha1.Workflow{ + ObjectMeta: metav1.ObjectMeta{UID: "2", CreationTimestamp: metav1.Time{Time: timeNow.Add(2 * time.Second)}}} + wf3 := v1alpha1.Workflow{ + ObjectMeta: metav1.ObjectMeta{UID: "3", CreationTimestamp: metav1.Time{Time: timeNow.Add(3 * time.Second)}}} + liveWfList := v1alpha1.WorkflowList{Items: []v1alpha1.Workflow{wf1Live, wf2}} + archivedWfList := v1alpha1.WorkflowList{Items: []v1alpha1.Workflow{wf1Archived, wf3, wf2}} + expectedWfList := v1alpha1.WorkflowList{Items: []v1alpha1.Workflow{wf3, wf2, wf1Live}} + expectedShortWfList := v1alpha1.WorkflowList{Items: []v1alpha1.Workflow{wf3, wf2}} + assert.Equal(t, expectedWfList.Items, mergeWithArchivedWorkflows(liveWfList, archivedWfList, 0).Items) + assert.Equal(t, expectedShortWfList.Items, mergeWithArchivedWorkflows(liveWfList, archivedWfList, 2).Items) +} + func TestWatchWorkflows(t *testing.T) { server, ctx := getWorkflowServer() wf := &v1alpha1.Workflow{ diff --git a/ui/src/models/workflows.ts b/ui/src/models/workflows.ts index 98720dbad5df..bd2a9a13b1c5 100644 --- a/ui/src/models/workflows.ts +++ b/ui/src/models/workflows.ts @@ -539,6 +539,10 @@ export interface Workflow { export const execSpec = (w: Workflow) => Object.assign({}, w.status.storedWorkflowTemplateSpec, w.spec); +export function isArchivedWorkflow(wf: Workflow): boolean { + return wf.metadata.labels && wf.metadata.labels['workflows.argoproj.io/workflow-archiving-status'] === 'Archived'; +} + export type NodeType = 'Pod' | 'Container' | 'Steps' | 'StepGroup' | 'DAG' | 'Retry' | 'Skipped' | 'TaskGroup' | 'Suspend'; export interface NodeStatus {