Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Implement multi tenancy in Flow Framework (#980) #1024

Merged
merged 1 commit into from
Jan 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,22 @@ jobs:
- name: Build and Run Tests
run: |
./gradlew integTest -PnumNodes=3
integTenantAwareTest:
needs: [spotless, javadoc]
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
java: [21]
name: Tenant Aware Integ Test JDK${{ matrix.java }}, ${{ matrix.os }}
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v4
- name: Set up JDK ${{ matrix.java }}
uses: actions/setup-java@v4
with:
java-version: ${{ matrix.java }}
distribution: temurin
- name: Build and Run Tests
run: |
./gradlew integTest "-Dtests.rest.tenantaware=true"
1 change: 0 additions & 1 deletion .github/workflows/test_security.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ jobs:
steps:
- name: Run start commands
run: ${{ needs.Get-CI-Image-Tag.outputs.ci-image-start-command }}

- name: Checkout Flow Framework
uses: actions/checkout@v4
- name: Setup Java ${{ matrix.java }}
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)

## [Unreleased 2.x](https://github.com/opensearch-project/flow-framework/compare/2.17...2.x)
### Features
- Add multitenant remote metadata client ([#980](https://github.com/opensearch-project/flow-framework/pull/980))
- Add synchronous execution option to workflow provisioning ([#990](https://github.com/opensearch-project/flow-framework/pull/990))
- Add ApiSpecFetcher for Fetching and Comparing API Specifications ([#651](https://github.com/opensearch-project/flow-framework/issues/651))
- Add optional config field to tool step ([#899](https://github.com/opensearch-project/flow-framework/pull/899))
Expand Down
50 changes: 43 additions & 7 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -167,17 +167,18 @@ configurations {

dependencies {
implementation "org.opensearch:opensearch:${opensearch_version}"
implementation 'org.junit.jupiter:junit-jupiter:5.11.4'
api group: 'org.opensearch', name:'opensearch-ml-client', version: "${opensearch_build}"
api group: 'org.opensearch.client', name: 'opensearch-rest-client', version: "${opensearch_version}"
implementation "org.apache.commons:commons-lang3:${versions.commonslang}"
api(group: 'org.opensearch.client', name: 'opensearch-rest-client', version: "${opensearch_version}") {
exclude group: "org.apache.httpcomponents.client5", module: "httpclient5"
}
api group: 'org.slf4j', name: 'slf4j-api', version: '1.7.36'
implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.17.0'
implementation "org.opensearch:common-utils:${common_utils_version}"
implementation "com.amazonaws:aws-encryption-sdk-java:3.0.1"
implementation "software.amazon.cryptography:aws-cryptographic-material-providers:1.8.0"
implementation "org.dafny:DafnyRuntime:4.9.1"
implementation "software.amazon.smithy.dafny:conversion:0.1.1"
implementation 'org.bouncycastle:bcprov-jdk18on:1.78.1'
implementation 'org.bouncycastle:bcprov-jdk18on:1.80'
implementation "jakarta.json.bind:jakarta.json.bind-api:3.0.1"
implementation "org.glassfish:jakarta.json:2.0.1"
implementation "org.eclipse:yasson:3.0.4"
Expand All @@ -188,7 +189,11 @@ dependencies {
implementation "io.swagger.parser.v3:swagger-parser-core:${swaggerVersion}"
implementation "io.swagger.parser.v3:swagger-parser:${swaggerVersion}"
implementation "io.swagger.parser.v3:swagger-parser-v3:${swaggerVersion}"
// Declare Jackson dependencies for tests (from OpenSearch version catalog)
// Multi-tenant SDK Client
implementation ("org.opensearch:opensearch-remote-metadata-sdk:${opensearch_build}") {
exclude group: "org.apache.httpcomponents.client5", module: "httpclient5"
}
testImplementation 'org.junit.jupiter:junit-jupiter:5.11.4'
testImplementation "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}"
testImplementation "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}"
testImplementation "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:${versions.jackson_databind}"
Expand All @@ -202,7 +207,6 @@ dependencies {
configurations.all {
resolutionStrategy {
force("com.google.guava:guava:33.4.0-jre") // CVE for 31.1, keep to force transitive dependencies
force("org.apache.httpcomponents.core5:httpcore5:5.3.2") // Dependency Jar Hell
}
}
}
Expand Down Expand Up @@ -262,10 +266,19 @@ integTest {
systemProperty('user', user)
systemProperty('password', password)

// Only tenant aware test if set
if (System.getProperty("tests.rest.tenantaware") == "true") {
filter {
includeTestsMatching "org.opensearch.flowframework.*TenantAwareIT"
}
systemProperty "plugins.flow_framework.multi_tenancy_enabled", "true"
}

// Only rest case can run with remote cluster
if (System.getProperty("tests.rest.cluster") != null) {
if (System.getProperty("tests.rest.cluster") != null && System.getProperty("tests.rest.tenantaware") == null) {
filter {
includeTestsMatching "org.opensearch.flowframework.rest.*IT"
excludeTestsMatching "org.opensearch.flowframework.rest.*TenantAwareIT"
}
}

Expand All @@ -288,11 +301,34 @@ integTest {
filter {
includeTestsMatching "org.opensearch.flowframework.rest.FlowFrameworkSecureRestApiIT"
excludeTestsMatching "org.opensearch.flowframework.rest.FlowFrameworkRestApiIT"
excludeTestsMatching "org.opensearch.flowframework.rest.*TenantAwareIT"
}
}

// doFirst delays this block until execution time
doFirst {
if (System.getProperty("tests.rest.tenantaware") == "true") {
def ymlFile = file("$buildDir/testclusters/integTest-0/config/opensearch.yml")
if (ymlFile.exists()) {
ymlFile.withWriterAppend {
writer ->
writer.write("\n# Set multitenancy\n")
writer.write("plugins.flow_framework.multi_tenancy_enabled: true\n")
}
// TODO this properly uses the remote client factory but needs a remote cluster set up
// TODO get the endpoint from a system property
if (System.getProperty("tests.rest.cluster") != null) {
ymlFile.withWriterAppend { writer ->
writer.write("\n# Use a remote cluster\n")
writer.write("plugins.flow_framework.remote_metadata_type: RemoteOpenSearch\n")
writer.write("plugins.flow_framework.remote_metadata_endpoint: https://127.0.0.1:9200\n")
}
}
} else {
throw new GradleException("opensearch.yml not found at: $ymlFile")
}
}

// Tell the test JVM if the cluster JVM is running under a debugger so that tests can
// use longer timeouts for requests.
def isDebuggingCluster = getDebug() || System.getProperty("test.debug") != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.SystemIndexPlugin;
import org.opensearch.remote.metadata.client.SdkClient;
import org.opensearch.remote.metadata.client.impl.SdkClientFactory;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.RestController;
import org.opensearch.rest.RestHandler;
Expand All @@ -75,22 +77,36 @@
import org.opensearch.watcher.ResourceWatcherService;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;

import static org.opensearch.flowframework.common.CommonValue.CONFIG_INDEX;
import static org.opensearch.flowframework.common.CommonValue.DEPROVISION_WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.CommonValue.FLOW_FRAMEWORK_THREAD_POOL_PREFIX;
import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX;
import static org.opensearch.flowframework.common.CommonValue.PROVISION_WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.CommonValue.TENANT_ID_FIELD;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FILTER_BY_BACKEND_ROLES;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_MULTI_TENANCY_ENABLED;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOWS;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOW_STEPS;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.REMOTE_METADATA_ENDPOINT;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.REMOTE_METADATA_REGION;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.REMOTE_METADATA_SERVICE_NAME;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.REMOTE_METADATA_TYPE;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.TASK_REQUEST_RETRY_DURATION;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.WORKFLOW_REQUEST_TIMEOUT;
import static org.opensearch.remote.metadata.common.CommonValue.REMOTE_METADATA_ENDPOINT_KEY;
import static org.opensearch.remote.metadata.common.CommonValue.REMOTE_METADATA_REGION_KEY;
import static org.opensearch.remote.metadata.common.CommonValue.REMOTE_METADATA_SERVICE_NAME_KEY;
import static org.opensearch.remote.metadata.common.CommonValue.REMOTE_METADATA_TYPE_KEY;
import static org.opensearch.remote.metadata.common.CommonValue.TENANT_AWARE_KEY;
import static org.opensearch.remote.metadata.common.CommonValue.TENANT_ID_FIELD_KEY;

/**
* An OpenSearch plugin that enables builders to innovate AI apps on OpenSearch.
Expand Down Expand Up @@ -121,9 +137,28 @@
Settings settings = environment.settings();
flowFrameworkSettings = new FlowFrameworkSettings(clusterService, settings);
MachineLearningNodeClient mlClient = new MachineLearningNodeClient(client);
EncryptorUtils encryptorUtils = new EncryptorUtils(clusterService, client, xContentRegistry);
SdkClient sdkClient = SdkClientFactory.createSdkClient(
client,
xContentRegistry,
// Here we assume remote metadata client is only used with tenant awareness.
// This may change in the future allowing more options for this map
FLOW_FRAMEWORK_MULTI_TENANCY_ENABLED.get(settings)
? Map.ofEntries(
Map.entry(REMOTE_METADATA_TYPE_KEY, REMOTE_METADATA_TYPE.get(settings)),
Map.entry(REMOTE_METADATA_ENDPOINT_KEY, REMOTE_METADATA_ENDPOINT.get(settings)),
Map.entry(REMOTE_METADATA_REGION_KEY, REMOTE_METADATA_REGION.get(settings)),
Map.entry(REMOTE_METADATA_SERVICE_NAME_KEY, REMOTE_METADATA_SERVICE_NAME.get(settings)),
Map.entry(TENANT_AWARE_KEY, "true"),
Map.entry(TENANT_ID_FIELD_KEY, TENANT_ID_FIELD)

Check warning on line 152 in src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java#L146-L152

Added lines #L146 - L152 were not covered by tests
)
: Collections.emptyMap(),
// TODO: Find a better thread pool or make one
client.threadPool().executor(ThreadPool.Names.GENERIC)
);
EncryptorUtils encryptorUtils = new EncryptorUtils(clusterService, client, sdkClient, xContentRegistry);
FlowFrameworkIndicesHandler flowFrameworkIndicesHandler = new FlowFrameworkIndicesHandler(
client,
sdkClient,
clusterService,
encryptorUtils,
xContentRegistry
Expand All @@ -137,15 +172,22 @@
);
WorkflowProcessSorter workflowProcessSorter = new WorkflowProcessSorter(workflowStepFactory, threadPool, flowFrameworkSettings);

SearchHandler searchHandler = new SearchHandler(settings, clusterService, client, FlowFrameworkSettings.FILTER_BY_BACKEND_ROLES);
SearchHandler searchHandler = new SearchHandler(
settings,
clusterService,
client,
sdkClient,
FlowFrameworkSettings.FILTER_BY_BACKEND_ROLES
);

return List.of(
workflowStepFactory,
workflowProcessSorter,
encryptorUtils,
flowFrameworkIndicesHandler,
searchHandler,
flowFrameworkSettings
flowFrameworkSettings,
sdkClient
);
}

Expand Down Expand Up @@ -196,7 +238,12 @@
MAX_WORKFLOW_STEPS,
WORKFLOW_REQUEST_TIMEOUT,
TASK_REQUEST_RETRY_DURATION,
FILTER_BY_BACKEND_ROLES
FILTER_BY_BACKEND_ROLES,
FLOW_FRAMEWORK_MULTI_TENANCY_ENABLED,
REMOTE_METADATA_TYPE,
REMOTE_METADATA_ENDPOINT,
REMOTE_METADATA_REGION,
REMOTE_METADATA_SERVICE_NAME
);
}

Expand All @@ -206,21 +253,21 @@
new ScalingExecutorBuilder(
WORKFLOW_THREAD_POOL,
1,
Math.max(2, OpenSearchExecutors.allocatedProcessors(settings) - 1),
Math.max(4, OpenSearchExecutors.allocatedProcessors(settings) - 1),
TimeValue.timeValueMinutes(1),
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL
),
new ScalingExecutorBuilder(
PROVISION_WORKFLOW_THREAD_POOL,
1,
Math.max(4, OpenSearchExecutors.allocatedProcessors(settings) - 1),
Math.max(8, OpenSearchExecutors.allocatedProcessors(settings) - 1),
TimeValue.timeValueMinutes(5),
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + PROVISION_WORKFLOW_THREAD_POOL
),
new ScalingExecutorBuilder(
DEPROVISION_WORKFLOW_THREAD_POOL,
1,
Math.max(2, OpenSearchExecutors.allocatedProcessors(settings) - 1),
Math.max(4, OpenSearchExecutors.allocatedProcessors(settings) - 1),
TimeValue.timeValueMinutes(1),
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + DEPROVISION_WORKFLOW_THREAD_POOL
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
*/
package org.opensearch.flowframework.common;

import org.opensearch.Version;

/**
* Representation of common values that are used across project
*/
Expand Down Expand Up @@ -82,6 +84,10 @@ private CommonValue() {}
public static final String USE_CASE = "use_case";
/** The param name for reprovisioning, used by the create workflow API */
public static final String REPROVISION_WORKFLOW = "reprovision";
/** The REST header containing the tenant id */
public static final String TENANT_ID_HEADER = "x-tenant-id";
/** The field name containing the tenant id */
public static final String TENANT_ID_FIELD = "tenant_id";

/*
* Constants associated with plugin configuration
Expand Down Expand Up @@ -244,4 +250,9 @@ private CommonValue() {}
public static final String ML_COMMONS_API_SPEC_YAML_URI =
"https://raw.githubusercontent.com/opensearch-project/opensearch-api-specification/refs/heads/main/spec/namespaces/ml.yaml";

/*
* Constants associated with non-BWC features
*/
/** Version 2.19.0 */
public static final Version VERSION_2_19_0 = Version.fromString("2.19.0");
}
Loading
Loading