Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Fix unused version in Running Workflows metric #3386

Merged
merged 5 commits into from
Jan 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -246,16 +246,13 @@ public static void recordTaskInProgress(String taskType, long size, String owner
StringUtils.defaultIfBlank(ownerApp, "unknown"));
}

public static void recordRunningWorkflows(
long count, String name, String version, String ownerApp) {
public static void recordRunningWorkflows(long count, String name, String ownerApp) {
gauge(
classQualifier,
"workflow_running",
count,
"workflowName",
name,
"version",
version,
"ownerApp",
StringUtils.defaultIfBlank(ownerApp, "unknown"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,13 @@
package com.netflix.conductor.metrics;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -24,6 +29,7 @@
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import com.netflix.conductor.annotations.VisibleForTesting;
import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.common.metadata.workflow.WorkflowDef;
import com.netflix.conductor.core.dal.ExecutionDAOFacade;
Expand Down Expand Up @@ -78,14 +84,13 @@ public void reportMetrics() {
refreshCounter = metadataRefreshInterval;
}

workflowDefs.forEach(
workflowDef -> {
String name = workflowDef.getName();
String version = String.valueOf(workflowDef.getVersion());
String ownerApp = workflowDef.getOwnerApp();
long count = executionDAOFacade.getPendingWorkflowCount(name);
Monitors.recordRunningWorkflows(count, name, version, ownerApp);
});
getPendingWorkflowToOwnerAppMap(workflowDefs)
.forEach(
(workflowName, ownerApp) -> {
long count =
executionDAOFacade.getPendingWorkflowCount(workflowName);
Monitors.recordRunningWorkflows(count, workflowName, ownerApp);
});

taskDefs.forEach(
taskDef -> {
Expand Down Expand Up @@ -115,4 +120,26 @@ public void reportMetrics() {
LOGGER.error("Error while publishing scheduled metrics", e);
}
}

/**
* Pending workflow data does not contain information about version. We only need the owner app
* and workflow name, and we only need to query for the workflow once.
*/
@VisibleForTesting
Map<String, String> getPendingWorkflowToOwnerAppMap(List<WorkflowDef> workflowDefs) {
final Map<String, List<WorkflowDef>> groupedWorkflowDefs =
workflowDefs.stream().collect(Collectors.groupingBy(WorkflowDef::getName));

Map<String, String> workflowNameToOwnerMap = new HashMap<>();
groupedWorkflowDefs.forEach(
(key, value) -> {
final WorkflowDef workflowDef =
value.stream()
.max(Comparator.comparing(WorkflowDef::getVersion))
.orElseThrow(NoSuchElementException::new);

workflowNameToOwnerMap.put(key, workflowDef.getOwnerApp());
});
return workflowNameToOwnerMap;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright 2022 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.metrics;

import java.util.List;
import java.util.Map;
import java.util.Set;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.springframework.test.context.junit4.SpringRunner;

import com.netflix.conductor.common.metadata.workflow.WorkflowDef;
import com.netflix.conductor.core.dal.ExecutionDAOFacade;
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.service.MetadataService;

@RunWith(SpringRunner.class)
public class WorkflowMonitorTest {

@Mock private MetadataService metadataService;
@Mock private QueueDAO queueDAO;
@Mock private ExecutionDAOFacade executionDAOFacade;

private WorkflowMonitor workflowMonitor;

@Before
public void beforeEach() {
workflowMonitor =
new WorkflowMonitor(metadataService, queueDAO, executionDAOFacade, 1000, Set.of());
}

private WorkflowDef makeDef(String name, int version, String ownerApp) {
WorkflowDef wd = new WorkflowDef();
wd.setName(name);
wd.setVersion(version);
wd.setOwnerApp(ownerApp);
return wd;
}

@Test
public void testPendingWorkflowDataMap() {
WorkflowDef test1_1 = makeDef("test1", 1, null);
WorkflowDef test1_2 = makeDef("test1", 2, "name1");

WorkflowDef test2_1 = makeDef("test2", 1, "first");
WorkflowDef test2_2 = makeDef("test2", 2, "mid");
WorkflowDef test2_3 = makeDef("test2", 3, "last");

final Map<String, String> mapping =
workflowMonitor.getPendingWorkflowToOwnerAppMap(
List.of(test1_1, test1_2, test2_1, test2_2, test2_3));

Assert.assertEquals(2, mapping.keySet().size());
Assert.assertTrue(mapping.containsKey("test1"));
Assert.assertTrue(mapping.containsKey("test2"));

Assert.assertEquals("name1", mapping.get("test1"));
Assert.assertEquals("last", mapping.get("test2"));
}
}