Skip to content

Commit

Permalink
Merge pull request #212 from awslabs/feature/add-existingOpensearchSe…
Browse files Browse the repository at this point in the history
…rverlessCollection

feat: implement opensearch serverless support
  • Loading branch information
hvital authored Jan 25, 2024
2 parents a4df397 + dd4c5d3 commit 4b6575d
Show file tree
Hide file tree
Showing 15 changed files with 685 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ def run_qa_agent_rag_no_memory(input_params):
if _doc_index is None:
logger.info("loading opensearch retriever")
doc_index = load_vector_db_opensearch(boto3.Session().region_name,
os.environ.get('OPENSEARCH_API_NAME'),
os.environ.get('OPENSEARCH_DOMAIN_ENDPOINT'),
os.environ.get('OPENSEARCH_INDEX'),
os.environ.get('OPENSEARCH_SECRET_ID'))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,7 @@ def get_message(self):
session_token=credentials.token,
)

aws_auth_os = AWS4Auth(
credentials.access_key,
credentials.secret_key,
aws_region,
'es',
session_token=credentials.token,
)


def get_credentials(secret_id: str, region_name: str) -> str:
client = boto3.client('secretsmanager', region_name=region_name)
Expand All @@ -93,6 +87,7 @@ def get_credentials_string(secret_id: str, region_name: str) -> str:
return secrets_value

def load_vector_db_opensearch(region: str,
opensearch_api_name: str,
opensearch_domain_endpoint: str,
opensearch_index: str,
secret_id: str) -> OpenSearchVectorSearch:
Expand All @@ -105,8 +100,13 @@ def load_vector_db_opensearch(region: str,
creds = get_credentials(secret_id, aws_region)
http_auth = (creds['username'], creds['password'])
else: # sigv4
http_auth = aws_auth_os

http_auth = AWS4Auth(
credentials.access_key,
credentials.secret_key,
aws_region,
opensearch_api_name,
session_token=credentials.token,
)
embedding_function = get_embeddings_llm()

vector_db = OpenSearchVectorSearch(index_name=opensearch_index,
Expand All @@ -115,8 +115,7 @@ def load_vector_db_opensearch(region: str,
http_auth=http_auth,
use_ssl = True,
verify_certs = True,
connection_class = RequestsHttpConnection,
is_aoss=False)
connection_class = RequestsHttpConnection)
print(f"returning handle to OpenSearchVectorSearch, vector_db={vector_db}")
return vector_db

Expand Down
144 changes: 98 additions & 46 deletions lambda/aws-rag-appsync-stepfn-opensearch/embeddings_job/src/lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from helpers.update_ingestion_status import updateIngestionJobStatus
from langchain.embeddings import BedrockEmbeddings
from helpers.s3inmemoryloader import S3TxtFileLoaderInMemory
from opensearchpy import OpenSearch, RequestsHttpConnection
from langchain.vectorstores import OpenSearchVectorSearch
from langchain.text_splitter import RecursiveCharacterTextSplitter
import multiprocessing as mp
Expand Down Expand Up @@ -75,6 +76,7 @@ def get_bedrock_client(service_name="bedrock-runtime"):
bucket_name = os.environ['OUTPUT_BUCKET']
opensearch_index = os.environ['OPENSEARCH_INDEX']
opensearch_domain = os.environ['OPENSEARCH_DOMAIN_ENDPOINT']
opensearch_api_name = os.environ['OPENSEARCH_API_NAME']

DATA_DIR = tempfile.gettempdir()
CHUNCK_SIZE_DOC_SPLIT=500
Expand All @@ -85,6 +87,97 @@ def get_bedrock_client(service_name="bedrock-runtime"):
PROCESS_COUNT=5
INDEX_FILE="index_file"

def process_documents_in_es(index_exists, shards, http_auth):
bedrock_client = get_bedrock_client()
embeddings = BedrockEmbeddings(client=bedrock_client)

if index_exists is False:
# create an index if the create index hint file exists
path = os.path.join(DATA_DIR, INDEX_FILE)
if os.path.isfile(path) is True:
print(f"index {opensearch_index} does not exist but {path} file is present so will create index")
# by default langchain would create a k-NN index and the embeddings would be ingested as a k-NN vector type
docsearch = OpenSearchVectorSearch.from_documents(index_name=opensearch_index,
documents=shards[0],
embedding=embeddings,
opensearch_url=opensearch_domain,
http_auth=http_auth)
# we now need to start the loop below for the second shard
shard_start_index = 1
else:
print(f"index {opensearch_index} does not exist and {path} file is not present, "
f"will wait for some other node to create the index")
shard_start_index = 0
# start a loop to wait for index creation by another node
time_slept = 0
while True:
print(f"index {opensearch_index} still does not exist, sleeping...")
time.sleep(PER_ITER_SLEEP_TIME)
index_exists = check_if_index_exists(opensearch_index,
aws_region,
opensearch_domain,
http_auth)
if index_exists is True:
print(f"index {opensearch_index} now exists")
break
time_slept += PER_ITER_SLEEP_TIME
if time_slept >= TOTAL_INDEX_CREATION_WAIT_TIME:
print(f"time_slept={time_slept} >= {TOTAL_INDEX_CREATION_WAIT_TIME}, not waiting anymore for index creation")
break

else:
print(f"index={opensearch_index} does exists, going to call add_documents")
shard_start_index = 0

for shard in shards[shard_start_index:]:
results = process_shard(shard=shard,
os_index_name=opensearch_index,
os_domain_ep=opensearch_domain,
os_http_auth=http_auth)

def process_documents_in_aoss(index_exists, shards, http_auth):
if index_exists is False:
vector_db = OpenSearch(
hosts = [{'host': opensearch_domain.replace("https://", ""), 'port': 443}],
http_auth = http_auth,
use_ssl = True,
verify_certs = True,
connection_class = RequestsHttpConnection
)
index_body = {
'settings': {
"index.knn": True
},
"mappings": {
"properties": {
"osha_vector": {
"type": "knn_vector",
"dimension": 1536,
"method": {
"engine": "faiss",
"name": "hnsw",
"space_type": "l2"
}
}
}
}
}
response = vector_db.indices.create(opensearch_index, body=index_body)
print(response)

print(f"index={opensearch_index} Adding Documents")
bedrock_client = get_bedrock_client()
embeddings = BedrockEmbeddings(client=bedrock_client, model_id="amazon.titan-embed-text-v1")
docsearch = OpenSearchVectorSearch(index_name=opensearch_index,
embedding_function=embeddings,
opensearch_url=opensearch_domain,
http_auth=http_auth,
use_ssl = True,
verify_certs = True,
connection_class = RequestsHttpConnection)
for shard in shards:
docsearch.add_documents(documents=shard)

@logger.inject_lambda_context(log_event=True)
@tracer.capture_lambda_handler
@metrics.log_metrics(capture_cold_start_metric=True)
Expand All @@ -100,7 +193,7 @@ def handler(event, context: LambdaContext) -> dict:
credentials.access_key,
credentials.secret_key,
aws_region,
'es',
opensearch_api_name,
session_token=credentials.token
)
job_id = event[0]['s3_transformer_result']['Payload']['jobid']
Expand Down Expand Up @@ -171,52 +264,11 @@ def handler(event, context: LambdaContext) -> dict:
'status':'failed'
}

bedrock_client = get_bedrock_client()
embeddings = BedrockEmbeddings(client=bedrock_client)
if opensearch_api_name == "es":
process_documents_in_es(index_exists, shards, http_auth)
elif opensearch_api_name == "aoss":
process_documents_in_aoss(index_exists, shards, http_auth)

if index_exists is False:
# create an index if the create index hint file exists
path = os.path.join(DATA_DIR, INDEX_FILE)
if os.path.isfile(path) is True:
print(f"index {opensearch_index} does not exist but {path} file is present so will create index")
# by default langchain would create a k-NN index and the embeddings would be ingested as a k-NN vector type
docsearch = OpenSearchVectorSearch.from_documents(index_name=opensearch_index,
documents=shards[0],
embedding=embeddings,
opensearch_url=opensearch_domain,
http_auth=http_auth)
# we now need to start the loop below for the second shard
shard_start_index = 1
else:
print(f"index {opensearch_index} does not exist and {path} file is not present, "
f"will wait for some other node to create the index")
shard_start_index = 0
# start a loop to wait for index creation by another node
time_slept = 0
while True:
print(f"index {opensearch_index} still does not exist, sleeping...")
time.sleep(PER_ITER_SLEEP_TIME)
index_exists = check_if_index_exists(opensearch_index,
aws_region,
opensearch_domain,
http_auth)
if index_exists is True:
print(f"index {opensearch_index} now exists")
break
time_slept += PER_ITER_SLEEP_TIME
if time_slept >= TOTAL_INDEX_CREATION_WAIT_TIME:
print(f"time_slept={time_slept} >= {TOTAL_INDEX_CREATION_WAIT_TIME}, not waiting anymore for index creation")
break

else:
print(f"index={opensearch_index} does exists, going to call add_documents")
shard_start_index = 0

for shard in shards[shard_start_index:]:
results = process_shard(shard=shard,
os_index_name=opensearch_index,
os_domain_ep=opensearch_domain,
os_http_auth=http_auth)

for file in files:
if file['status'] == 'File transformed':
Expand Down
6 changes: 3 additions & 3 deletions src/common/helpers/appsyncmergedapi-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,10 @@ export function setMergedApiRole(mergedApiID: String, sourceApiId: String, merge
actions: ['appsync:SourceGraphQL',
'appsync:StartSchemaMerge'],
resources: [
'arn:aws:appsync:' + Aws.REGION + ':' + Aws.ACCOUNT_ID
'arn:' + Aws.PARTITION + ':appsync:' + Aws.REGION + ':' + Aws.ACCOUNT_ID
+ ':apis/' + sourceApiId + '/*',
'arn:aws:appsync:'+ Aws.REGION+':'+Aws.ACCOUNT_ID+':apis/'+mergedApiID+'/sourceApiAssociations/*',
'arn:aws:appsync:'+ Aws.REGION+':'+Aws.ACCOUNT_ID+':apis/'+sourceApiId+'/sourceApiAssociations/*',
'arn:' + Aws.PARTITION + ':appsync:' + Aws.REGION + ':' + Aws.ACCOUNT_ID + ':apis/' + mergedApiID + '/sourceApiAssociations/*',
'arn:' + Aws.PARTITION + ':appsync:' + Aws.REGION + ':' + Aws.ACCOUNT_ID + ':apis/' + sourceApiId + '/sourceApiAssociations/*',
],
}),
);
Expand Down
70 changes: 70 additions & 0 deletions src/common/helpers/opensearch-helper.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance
* with the License. A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the 'license' file accompanying this file. This file is distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES
* OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
import * as openSearchServerless from 'aws-cdk-lib/aws-opensearchserverless';
import * as opensearchservice from 'aws-cdk-lib/aws-opensearchservice';

export interface OpenSearchProps {
/**
* Optional existing Amazon OpenSearch Service domain.
*
* @default - None
*/
readonly existingOpensearchDomain?: opensearchservice.IDomain;
/**
* Optional existing Amazon Amazon OpenSearch Serverless collection.
*
* @default - None.
*/
readonly existingOpensearchServerlessCollection?: openSearchServerless.CfnCollection;
}

export function CheckOpenSearchProps(propsObject: OpenSearchProps | any) {
let errorMessages = '';
let errorFound = false;

if (propsObject.existingOpenSearchDomain && propsObject.existingOpenSearchServerlessCollection) {
errorMessages += 'Error - Either provide existingOpenSearchDomain or existingOpenSearchServerlessCollection, but not both.\n';
errorFound = true;
}

if (propsObject.existingOpenSearchDomain === null && propsObject.existingOpenSearchServerlessCollection === null) {
errorMessages += 'Error - existingOpenSearchDomain and existingOpenSearchServerlessCollection cannot both be null. Please provide one.\n';
errorFound = true;
}

if (errorFound) {
throw new Error(errorMessages);
}
}

export function getOpenSearchApiName(propsObject: OpenSearchProps) {
if (propsObject.existingOpensearchDomain) {
return 'es';

} else if (propsObject.existingOpensearchServerlessCollection) {
return 'aoss';
}

throw new Error('OpenSearch resouce not defined.');
}

export function getOpenSearchEndpoint(propsObject: OpenSearchProps) {
if (propsObject.existingOpensearchDomain) {
return propsObject.existingOpensearchDomain.domainEndpoint;

} else if (propsObject.existingOpensearchServerlessCollection) {
return propsObject.existingOpensearchServerlessCollection.attrCollectionEndpoint;
}

throw new Error('OpenSearch resouce not defined.');
}
4 changes: 2 additions & 2 deletions src/patterns/gen-ai/aws-langchain-common-layer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ const lambdaRuntime = lambda.Runtime.PYTHON_3_10;
// This is one way of getting a lambda powertools layer
const powerToolsArn =
lambdaArchitecture === lambda.Architecture.X86_64
? `arn:aws:lambda:${cdk.Aws.REGION}:017000801446:layer:AWSLambdaPowertoolsPythonV2:42`
: `arn:aws:lambda:${cdk.Aws.REGION}:017000801446:layer:AWSLambdaPowertoolsPythonV2-Arm64:42`;
? `arn:${Aws.PARTITION}:lambda:${Aws.REGION}:017000801446:layer:AWSLambdaPowertoolsPythonV2:42`
: `arn:${Aws.PARTITION}:lambda:${Aws.REGION}:017000801446:layer:AWSLambdaPowertoolsPythonV2-Arm64:42`;

const lambdaDepsLayer = new LangchainCommonDepsLayer(this, 'lambdagenaidepslayer', {
runtime: lambdaRuntime,
Expand Down
10 changes: 7 additions & 3 deletions src/patterns/gen-ai/aws-qa-appsync-opensearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ Typescript

``` typescript
import { Construct } from 'constructs';
import { Stack, StackProps } from 'aws-cdk-lib';
import { Stack, StackProps, Aws } from 'aws-cdk-lib';
import * as os from 'aws-cdk-lib/aws-opensearchservice';
import * as cognito from 'aws-cdk-lib/aws-cognito';
import { QaAppsyncOpensearch, QaAppsyncOpensearchProps } from '@cdklabs/generative-ai-cdk-constructs';

// get an existing OpenSearch provisioned cluster
const osDomain = os.Domain.fromDomainAttributes(this, 'osdomain', {
domainArn: 'arn:aws:es:us-east-1:XXXXXX',
domainArn: 'arn:' + Aws.PARTITION + ':es:us-east-1:XXXXXX',
domainEndpoint: 'https://XXXXX.us-east-1.es.amazonaws.com'
});

Expand Down Expand Up @@ -180,9 +180,13 @@ Parameters

## Pattern Construct Props

> **Note:** One of either ```existingOpensearchDomain``` or ```existingOpensearchServerlessCollection``` must be specified, but not both.

| **Name** | **Type** | **Required** |**Description** |
|:-------------|:----------------|-----------------|-----------------|
| existingOpenSearchDomain | [opensearchservice.IDomain](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_opensearchservice.IDomain.html)| ![Required](https://img.shields.io/badge/required-ff0000) | Existing domain for the OpenSearch Service. |
| existingOpensearchDomain | [opensearchservice.IDomain](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_opensearchservice.IDomain.html)| ![Optional](https://img.shields.io/badge/optional-4169E1) | Existing domain for the OpenSearch Service. **Mutually exclusive** with ```existingOpensearchServerlessCollection``` - only one should be specified. |
| existingOpensearchServerlessCollection | [openSearchServerless.CfnCollection](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-opensearchserverless-collection.html)| ![Optional](https://img.shields.io/badge/optional-4169E1) | Existing Amazon Amazon OpenSearch Serverless collection. **Mutually exclusive** with ```existingOpensearchDomain``` - only one should be specified. |
| openSearchIndexName | string | ![Required](https://img.shields.io/badge/required-ff0000) | Index name for the Amazon OpenSearch Service. If doesn't exist, the pattern will create the index in the cluster. |
| cognitoUserPool | [cognito.IUserPool](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_cognito.IUserPool.html) | ![Required](https://img.shields.io/badge/required-ff0000) | Cognito user pool used for authentication. |
| openSearchSecret | [secret.ISecret](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_secretsmanager.ISecret.html) | ![Optional](https://img.shields.io/badge/optional-4169E1) | Optional. Secret containing credentials to authenticate to the existing Amazon OpenSearch domain if fine grain control access is configured. If not provided, the Lambda function will use [AWS Signature Version 4](https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_aws-signing.html). |
Expand Down
Loading

0 comments on commit 4b6575d

Please sign in to comment.