Skip to content

Commit

Permalink
Add an HttpHost Workflow Step
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <widdis@gmail.com>
  • Loading branch information
dbwiddis committed Feb 20, 2024
1 parent be0df19 commit 172f47a
Show file tree
Hide file tree
Showing 6 changed files with 246 additions and 2 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)

## [Unreleased 2.x](https://github.com/opensearch-project/flow-framework/compare/2.12...2.x)
### Features
- Add HttpHost WorkflowStep ([#530](https://github.com/opensearch-project/flow-framework/pull/530))

### Enhancements
### Bug Fixes
### Infrastructure
Expand Down
2 changes: 1 addition & 1 deletion DEVELOPER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ snapshots/
### Adding Workflow Steps

To add functionality to workflows, add new Workflow Steps to the [`org.opensearch.flowframework.workflow`](https://github.com/opensearch-project/flow-framework/tree/main/src/main/java/org/opensearch/flowframework/workflow) package.
1. Implement the [Workflow](https://github.com/opensearch-project/flow-framework/blob/main/src/main/java/org/opensearch/flowframework/workflow/WorkflowStep.java) interface. See existing steps for examples for input, output, and API execution.
1. Implement the [WorkflowStep](https://github.com/opensearch-project/flow-framework/blob/main/src/main/java/org/opensearch/flowframework/workflow/WorkflowStep.java) interface. See existing steps for examples for input, output, and API execution.
2. Choose a unique name for the step which is not used by other steps. This will align with the `step_type` field in the templates and should be descriptive of what the step does.
3. Add a constructor and call it from the [WorkflowStepFactory](https://github.com/opensearch-project/flow-framework/blob/main/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java).
4. Add an entry to the [WorkflowStepFactory](https://github.com/opensearch-project/flow-framework/blob/main/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java) enum specifying required inputs, outputs, required plugins, and optionally a different timeout than the default.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,14 @@ private CommonValue() {}
public static final String CREATED_TIME = "created_time";
/** The last updated time field for an agent */
public static final String LAST_UPDATED_TIME_FIELD = "last_updated_time";
/** HttpHost */
public static final String HTTP_HOST_FIELD = "http_host";
/** Http scheme */
public static final String SCHEME_FIELD = "scheme";
/** Http hostname */
public static final String HOSTNAME_FIELD = "hostname";
/** Http port */
public static final String PORT_FIELD = "port";

/*
* Constants associated with resource provisioning / state
Expand Down
103 changes: 103 additions & 0 deletions src/main/java/org/opensearch/flowframework/workflow/HttpHostStep.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.workflow;

import org.apache.hc.core5.http.HttpHost;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.util.ParseUtils;

import java.util.Collections;
import java.util.Locale;
import java.util.Map;
import java.util.Set;

import static org.opensearch.flowframework.common.CommonValue.HOSTNAME_FIELD;
import static org.opensearch.flowframework.common.CommonValue.HTTP_HOST_FIELD;
import static org.opensearch.flowframework.common.CommonValue.PORT_FIELD;
import static org.opensearch.flowframework.common.CommonValue.SCHEME_FIELD;

/**
* Step to register parameters for an HTTP Connection to a Host
*/
public class HttpHostStep implements WorkflowStep {

private static final Logger logger = LogManager.getLogger(HttpHostStep.class);
PlainActionFuture<WorkflowData> hostFuture = PlainActionFuture.newFuture();
static final String NAME = "http_host";

@Override
public PlainActionFuture<WorkflowData> execute(
String currentNodeId,
WorkflowData currentNodeInputs,
Map<String, WorkflowData> outputs,
Map<String, String> previousNodeInputs
) {
Set<String> requiredKeys = Set.of(SCHEME_FIELD, HOSTNAME_FIELD, PORT_FIELD);
Set<String> optionalKeys = Collections.emptySet();

try {
Map<String, Object> inputs = ParseUtils.getInputsFromPreviousSteps(
requiredKeys,
optionalKeys,
currentNodeInputs,
outputs,
previousNodeInputs
);

String scheme = validScheme(inputs.get(SCHEME_FIELD));
String hostname = inputs.get(HOSTNAME_FIELD).toString();
int port = validPort(inputs.get(PORT_FIELD));

HttpHost httpHost = new HttpHost(scheme, hostname, port);

hostFuture.onResponse(
new WorkflowData(
Map.ofEntries(Map.entry(HTTP_HOST_FIELD, httpHost)),
currentNodeInputs.getWorkflowId(),
currentNodeInputs.getNodeId()
)
);

logger.info("Http Host registered successfully {}", httpHost);

} catch (FlowFrameworkException e) {
hostFuture.onFailure(e);
}
return hostFuture;
}

private String validScheme(Object o) {
String scheme = o.toString().toLowerCase(Locale.ROOT);
if ("http".equals(scheme) || "https".equals(scheme)) {
return scheme;
}
throw new FlowFrameworkException("http_host scheme must be http or https", RestStatus.BAD_REQUEST);
}

private int validPort(Object o) {
try {
int port = Integer.parseInt(o.toString());
if ((port & 0xffff0000) != 0) {
throw new FlowFrameworkException("http_host port number must be between 0 and 65535", RestStatus.BAD_REQUEST);
}
return port;
} catch (NumberFormatException e) {
throw new FlowFrameworkException("http_host port must be a number between 0 and 65535", RestStatus.BAD_REQUEST);
}
}

@Override
public String getName() {
return NAME;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,19 @@
import static org.opensearch.flowframework.common.CommonValue.EMBEDDING_DIMENSION;
import static org.opensearch.flowframework.common.CommonValue.FRAMEWORK_TYPE;
import static org.opensearch.flowframework.common.CommonValue.FUNCTION_NAME;
import static org.opensearch.flowframework.common.CommonValue.HOSTNAME_FIELD;
import static org.opensearch.flowframework.common.CommonValue.HTTP_HOST_FIELD;
import static org.opensearch.flowframework.common.CommonValue.MODEL_CONTENT_HASH_VALUE;
import static org.opensearch.flowframework.common.CommonValue.MODEL_FORMAT;
import static org.opensearch.flowframework.common.CommonValue.MODEL_GROUP_STATUS;
import static org.opensearch.flowframework.common.CommonValue.MODEL_TYPE;
import static org.opensearch.flowframework.common.CommonValue.NAME_FIELD;
import static org.opensearch.flowframework.common.CommonValue.OPENSEARCH_ML;
import static org.opensearch.flowframework.common.CommonValue.PARAMETERS_FIELD;
import static org.opensearch.flowframework.common.CommonValue.PORT_FIELD;
import static org.opensearch.flowframework.common.CommonValue.PROTOCOL_FIELD;
import static org.opensearch.flowframework.common.CommonValue.REGISTER_MODEL_STATUS;
import static org.opensearch.flowframework.common.CommonValue.SCHEME_FIELD;
import static org.opensearch.flowframework.common.CommonValue.SUCCESS;
import static org.opensearch.flowframework.common.CommonValue.TOOLS_FIELD;
import static org.opensearch.flowframework.common.CommonValue.TYPE;
Expand Down Expand Up @@ -234,7 +238,17 @@ public enum WorkflowSteps {
),

/** Create Tool Step */
CREATE_TOOL(ToolStep.NAME, List.of(TYPE), List.of(TOOLS_FIELD), List.of(OPENSEARCH_ML), null, ToolStep::new);
CREATE_TOOL(ToolStep.NAME, List.of(TYPE), List.of(TOOLS_FIELD), List.of(OPENSEARCH_ML), null, ToolStep::new),

/** Http Host Step */
HTTP_HOST(
HttpHostStep.NAME,
List.of(SCHEME_FIELD, HOSTNAME_FIELD, PORT_FIELD),
List.of(HTTP_HOST_FIELD),
Collections.emptyList(),
null,
HttpHostStep::new
);

private final String workflowStepName;
private final List<String> inputs;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.workflow;

import org.apache.hc.core5.http.HttpHost;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.test.OpenSearchTestCase;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutionException;

import static org.opensearch.flowframework.common.CommonValue.HOSTNAME_FIELD;
import static org.opensearch.flowframework.common.CommonValue.HTTP_HOST_FIELD;
import static org.opensearch.flowframework.common.CommonValue.PORT_FIELD;
import static org.opensearch.flowframework.common.CommonValue.SCHEME_FIELD;

public class HttpHostStepTests extends OpenSearchTestCase {

public void testHttpHost() throws InterruptedException, ExecutionException {
HttpHostStep httpHostStep = new HttpHostStep();
assertEquals(HttpHostStep.NAME, httpHostStep.getName());

WorkflowData inputData = new WorkflowData(
Map.ofEntries(Map.entry(SCHEME_FIELD, "http"), Map.entry(HOSTNAME_FIELD, "localhost"), Map.entry(PORT_FIELD, 1234)),
"test-id",
"test-node-id"
);

PlainActionFuture<WorkflowData> future = httpHostStep.execute(
inputData.getNodeId(),
inputData,
Collections.emptyMap(),
Collections.emptyMap()
);

assertTrue(future.isDone());
assertEquals(HttpHost.class, future.get().getContent().get(HTTP_HOST_FIELD).getClass());
HttpHost host = (HttpHost) future.get().getContent().get(HTTP_HOST_FIELD);
assertEquals("http", host.getSchemeName());
assertEquals("localhost", host.getHostName());
assertEquals(1234, host.getPort());
}

public void testBadScheme() {
HttpHostStep httpHostStep = new HttpHostStep();

WorkflowData badSchemeData = new WorkflowData(
Map.ofEntries(Map.entry(SCHEME_FIELD, "ftp"), Map.entry(HOSTNAME_FIELD, "localhost"), Map.entry(PORT_FIELD, 1234)),
"test-id",
"test-node-id"
);

PlainActionFuture<WorkflowData> future = httpHostStep.execute(
badSchemeData.getNodeId(),
badSchemeData,
Collections.emptyMap(),
Collections.emptyMap()
);

assertTrue(future.isDone());
ExecutionException ex = assertThrows(ExecutionException.class, () -> future.get());
assertEquals(FlowFrameworkException.class, ex.getCause().getClass());
assertEquals("http_host scheme must be http or https", ex.getCause().getMessage());
}

public void testBadPort() {
HttpHostStep httpHostStep = new HttpHostStep();

WorkflowData badPortData = new WorkflowData(
Map.ofEntries(Map.entry(SCHEME_FIELD, "https"), Map.entry(HOSTNAME_FIELD, "localhost"), Map.entry(PORT_FIELD, 123456)),
"test-id",
"test-node-id"
);

PlainActionFuture<WorkflowData> future = httpHostStep.execute(
badPortData.getNodeId(),
badPortData,
Collections.emptyMap(),
Collections.emptyMap()
);

assertTrue(future.isDone());
ExecutionException ex = assertThrows(ExecutionException.class, () -> future.get());
assertEquals(FlowFrameworkException.class, ex.getCause().getClass());
assertEquals("http_host port number must be between 0 and 65535", ex.getCause().getMessage());
}

public void testNoParsePort() {
HttpHostStep httpHostStep = new HttpHostStep();

WorkflowData noParsePortData = new WorkflowData(
Map.ofEntries(Map.entry(SCHEME_FIELD, "https"), Map.entry(HOSTNAME_FIELD, "localhost"), Map.entry(PORT_FIELD, "doesn't parse")),
"test-id",
"test-node-id"
);

PlainActionFuture<WorkflowData> future = httpHostStep.execute(
noParsePortData.getNodeId(),
noParsePortData,
Collections.emptyMap(),
Collections.emptyMap()
);

assertTrue(future.isDone());
ExecutionException ex = assertThrows(ExecutionException.class, () -> future.get());
assertEquals(FlowFrameworkException.class, ex.getCause().getClass());
assertEquals("http_host port must be a number between 0 and 65535", ex.getCause().getMessage());
}
}

0 comments on commit 172f47a

Please sign in to comment.