diff --git a/.codecov.yml b/.codecov.yml index 7c38e4e63..e5bbd7262 100644 --- a/.codecov.yml +++ b/.codecov.yml @@ -1,6 +1,10 @@ codecov: require_ci_to_pass: yes +# ignore files in demo package +ignore: + - "src/main/java/demo" + coverage: precision: 2 round: down diff --git a/formatter/formatting.gradle b/formatter/formatting.gradle index e3bc090e0..88d097623 100644 --- a/formatter/formatting.gradle +++ b/formatter/formatting.gradle @@ -35,6 +35,7 @@ allprojects { trimTrailingWhitespace() endWithNewline() + indentWithSpaces(4) } format("license", { licenseHeaderFile("${rootProject.file("formatter/license-header.txt")}", "package "); diff --git a/src/main/java/demo/CreateIndexWorkflowStep.java b/src/main/java/demo/CreateIndexWorkflowStep.java index d6c40c6ff..17b42567d 100644 --- a/src/main/java/demo/CreateIndexWorkflowStep.java +++ b/src/main/java/demo/CreateIndexWorkflowStep.java @@ -18,12 +18,18 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; +/** + * Sample to show other devs how to pass data around. Will be deleted once other PRs are merged. + */ public class CreateIndexWorkflowStep implements WorkflowStep { private static final Logger logger = LogManager.getLogger(CreateIndexWorkflowStep.class); private final String name; + /** + * Instantiate this class. + */ public CreateIndexWorkflowStep() { this.name = "CREATE_INDEX"; } diff --git a/src/main/java/demo/DataDemo.java b/src/main/java/demo/DataDemo.java index cedf0f5e9..f2d606f07 100644 --- a/src/main/java/demo/DataDemo.java +++ b/src/main/java/demo/DataDemo.java @@ -10,16 +10,19 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.common.SuppressForbidden; +import org.opensearch.common.io.PathUtils; import org.opensearch.flowframework.template.ProcessNode; import org.opensearch.flowframework.template.TemplateParser; import org.opensearch.flowframework.workflow.WorkflowStep; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; -import java.nio.file.Paths; import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -45,11 +48,12 @@ public class DataDemo { * * @param args unused */ + @SuppressForbidden(reason = "just a demo class that will be deleted") public static void main(String[] args) { String path = "src/test/resources/template/datademo.json"; String json; try { - json = new String(Files.readAllBytes(Paths.get(path))); + json = new String(Files.readAllBytes(PathUtils.get(path)), StandardCharsets.UTF_8); } catch (IOException e) { logger.error("Failed to read JSON at path {}", path); return; @@ -67,6 +71,7 @@ public static void main(String[] args) { predecessors.isEmpty() ? " Can start immediately!" : String.format( + Locale.getDefault(), " Must wait for [%s] to complete first.", predecessors.stream().map(p -> p.id()).collect(Collectors.joining(", ")) ) diff --git a/src/main/java/demo/Demo.java b/src/main/java/demo/Demo.java index 0dba03169..58d977827 100644 --- a/src/main/java/demo/Demo.java +++ b/src/main/java/demo/Demo.java @@ -10,16 +10,19 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.common.SuppressForbidden; +import org.opensearch.common.io.PathUtils; import org.opensearch.flowframework.template.ProcessNode; import org.opensearch.flowframework.template.TemplateParser; import org.opensearch.flowframework.workflow.WorkflowStep; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; -import java.nio.file.Paths; import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -47,11 +50,12 @@ public class Demo { * * @param args unused */ + @SuppressForbidden(reason = "just a demo class that will be deleted") public static void main(String[] args) { String path = "src/test/resources/template/demo.json"; String json; try { - json = new String(Files.readAllBytes(Paths.get(path))); + json = new String(Files.readAllBytes(PathUtils.get(path)), StandardCharsets.UTF_8); } catch (IOException e) { logger.error("Failed to read JSON at path {}", path); return; @@ -69,6 +73,7 @@ public static void main(String[] args) { predecessors.isEmpty() ? " Can start immediately!" : String.format( + Locale.getDefault(), " Must wait for [%s] to complete first.", predecessors.stream().map(p -> p.id()).collect(Collectors.joining(", ")) ) diff --git a/src/main/java/demo/DemoWorkflowStep.java b/src/main/java/demo/DemoWorkflowStep.java index 866928f7c..037d9b6f6 100644 --- a/src/main/java/demo/DemoWorkflowStep.java +++ b/src/main/java/demo/DemoWorkflowStep.java @@ -14,11 +14,18 @@ import java.util.List; import java.util.concurrent.CompletableFuture; +/** + * Demo workflowstep to show sequenced execution + */ public class DemoWorkflowStep implements WorkflowStep { private final long delay; private final String name; + /** + * Instantiate a step with a delay. + * @param delay milliseconds to take pretending to do work while really sleeping + */ public DemoWorkflowStep(long delay) { this.delay = delay; this.name = "DEMO_DELAY_" + delay; diff --git a/src/main/java/org/opensearch/flowframework/template/TemplateParser.java b/src/main/java/org/opensearch/flowframework/template/TemplateParser.java index bba8bfb1e..efca5a954 100644 --- a/src/main/java/org/opensearch/flowframework/template/TemplateParser.java +++ b/src/main/java/org/opensearch/flowframework/template/TemplateParser.java @@ -127,9 +127,9 @@ private static List topologicalSort(List nodes, List

sortedNodes = new ArrayList<>(); - // S ← Set of all nodes with no incoming edge + // S <- Set of all nodes with no incoming edge Queue sourceNodes = new ArrayDeque<>(); nodes.stream().filter(n -> !predecessorEdges.containsKey(n)).forEach(n -> sourceNodes.add(n)); if (sourceNodes.isEmpty()) { diff --git a/src/test/java/org/opensearch/flowframework/template/ProcessNodeTests.java b/src/test/java/org/opensearch/flowframework/template/ProcessNodeTests.java index f9fa328c6..3feab9f3b 100644 --- a/src/test/java/org/opensearch/flowframework/template/ProcessNodeTests.java +++ b/src/test/java/org/opensearch/flowframework/template/ProcessNodeTests.java @@ -8,15 +8,20 @@ */ package org.opensearch.flowframework.template; +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope; +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope.Scope; + import org.opensearch.flowframework.workflow.WorkflowData; import org.opensearch.flowframework.workflow.WorkflowStep; import org.opensearch.test.OpenSearchTestCase; +import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +@ThreadLeakScope(Scope.NONE) public class ProcessNodeTests extends OpenSearchTestCase { @Override @@ -37,21 +42,24 @@ public CompletableFuture execute(List data) { public String getName() { return "test"; } - }, WorkflowData.EMPTY); + }); assertEquals("A", nodeA.id()); assertEquals("test", nodeA.workflowStep().getName()); assertEquals(WorkflowData.EMPTY, nodeA.input()); - // FIXME: This is causing thread leaks + assertEquals(Collections.emptySet(), nodeA.getPredecessors()); + assertEquals("A", nodeA.toString()); + + // TODO: Once we can get OpenSearch Thread Pool for this execute method, create an IT and don't test execute here CompletableFuture f = nodeA.execute(); assertEquals(f, nodeA.getFuture()); f.orTimeout(5, TimeUnit.SECONDS); assertTrue(f.isDone()); assertEquals(WorkflowData.EMPTY, f.get()); - ProcessNode nodeB = new ProcessNode("B", null, null); + ProcessNode nodeB = new ProcessNode("B", null); assertNotEquals(nodeA, nodeB); - ProcessNode nodeA2 = new ProcessNode("A", null, null); + ProcessNode nodeA2 = new ProcessNode("A", null); assertEquals(nodeA, nodeA2); } } diff --git a/src/test/java/org/opensearch/flowframework/workflow/WorkflowDataTests.java b/src/test/java/org/opensearch/flowframework/workflow/WorkflowDataTests.java new file mode 100644 index 000000000..42a1a1a03 --- /dev/null +++ b/src/test/java/org/opensearch/flowframework/workflow/WorkflowDataTests.java @@ -0,0 +1,28 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.workflow; + +import org.opensearch.test.OpenSearchTestCase; + +import java.util.Collections; + +public class WorkflowDataTests extends OpenSearchTestCase { + + @Override + public void setUp() throws Exception { + super.setUp(); + } + + public void testWorkflowData() { + WorkflowData data = new WorkflowData() { + }; + assertEquals(Collections.emptyMap(), data.getParams()); + assertEquals(Collections.emptyMap(), data.getContent()); + } +} diff --git a/src/test/resources/template/datademo.json b/src/test/resources/template/datademo.json index e2b9eb386..a1323ed2c 100644 --- a/src/test/resources/template/datademo.json +++ b/src/test/resources/template/datademo.json @@ -1,20 +1,20 @@ { - "sequence": { - "nodes": [ - { - "id": "create_index", - "index_name": "demo" - }, - { - "id": "create_another_index", - "index_name": "second_demo" - } - ], - "edges": [ - { - "source": "create_index", - "dest": "create_another_index" - } - ] - } + "sequence": { + "nodes": [ + { + "id": "create_index", + "index_name": "demo" + }, + { + "id": "create_another_index", + "index_name": "second_demo" + } + ], + "edges": [ + { + "source": "create_index", + "dest": "create_another_index" + } + ] + } }