From f719c7e70eaf5a8cce4595de5f6794ad5a5073ef Mon Sep 17 00:00:00 2001 From: Aarushi <50577581+aarushik93@users.noreply.github.com> Date: Fri, 8 Nov 2024 11:13:55 -0600 Subject: [PATCH] feat(blocks): Pinecone blocks (#8535) * update pinecone * update blocks * fix linting * update test * update requests * mock funciton --- .../backend/backend/blocks/pinecone.py | 123 ++++++++++++++++-- 1 file changed, 110 insertions(+), 13 deletions(-) diff --git a/autogpt_platform/backend/backend/blocks/pinecone.py b/autogpt_platform/backend/backend/blocks/pinecone.py index 91364fce92f0..5ef8e639920f 100644 --- a/autogpt_platform/backend/backend/blocks/pinecone.py +++ b/autogpt_platform/backend/backend/blocks/pinecone.py @@ -1,4 +1,5 @@ -from typing import Literal +import uuid +from typing import Any, Literal from autogpt_libs.supabase_integration_credentials_store import APIKeyCredentials from pinecone import Pinecone, ServerlessSpec @@ -98,10 +99,14 @@ class Input(BlockSchema): include_metadata: bool = SchemaField( description="Whether to include metadata in the response", default=True ) - host: str = SchemaField(description="Host for pinecone") + host: str = SchemaField(description="Host for pinecone", default="") + idx_name: str = SchemaField(description="Index name for pinecone") class Output(BlockSchema): - results: dict = SchemaField(description="Query results from Pinecone") + results: Any = SchemaField(description="Query results from Pinecone") + combined_results: Any = SchemaField( + description="Combined results from Pinecone" + ) def __init__(self): super().__init__( @@ -119,13 +124,105 @@ def run( credentials: APIKeyCredentials, **kwargs, ) -> BlockOutput: - pc = Pinecone(api_key=credentials.api_key.get_secret_value()) - idx = pc.Index(host=input_data.host) - results = idx.query( - namespace=input_data.namespace, - vector=input_data.query_vector, - top_k=input_data.top_k, - include_values=input_data.include_values, - include_metadata=input_data.include_metadata, - ) - yield "results", results + try: + # Create a new client instance + pc = Pinecone(api_key=credentials.api_key.get_secret_value()) + + # Get the index + idx = pc.Index(input_data.idx_name) + + # Ensure query_vector is in correct format + query_vector = input_data.query_vector + if isinstance(query_vector, list) and len(query_vector) > 0: + if isinstance(query_vector[0], list): + query_vector = query_vector[0] + + results = idx.query( + namespace=input_data.namespace, + vector=query_vector, + top_k=input_data.top_k, + include_values=input_data.include_values, + include_metadata=input_data.include_metadata, + ).to_dict() + combined_text = "" + if results["matches"]: + texts = [ + match["metadata"]["text"] + for match in results["matches"] + if match.get("metadata", {}).get("text") + ] + combined_text = "\n\n".join(texts) + + # Return both the raw matches and combined text + yield "results", { + "matches": results["matches"], + "combined_text": combined_text, + } + yield "combined_results", combined_text + + except Exception as e: + error_msg = f"Error querying Pinecone: {str(e)}" + raise RuntimeError(error_msg) from e + + +class PineconeInsertBlock(Block): + class Input(BlockSchema): + credentials: PineconeCredentialsInput = PineconeCredentialsField() + index: str = SchemaField(description="Initialized Pinecone index") + chunks: list = SchemaField(description="List of text chunks to ingest") + embeddings: list = SchemaField( + description="List of embeddings corresponding to the chunks" + ) + namespace: str = SchemaField( + description="Namespace to use in Pinecone", default="" + ) + metadata: dict = SchemaField( + description="Additional metadata to store with each vector", default={} + ) + + class Output(BlockSchema): + upsert_response: str = SchemaField( + description="Response from Pinecone upsert operation" + ) + + def __init__(self): + super().__init__( + id="477f2168-cd91-475a-8146-9499a5982434", + description="Upload data to a Pinecone index", + categories={BlockCategory.LOGIC}, + input_schema=PineconeInsertBlock.Input, + output_schema=PineconeInsertBlock.Output, + ) + + def run( + self, + input_data: Input, + *, + credentials: APIKeyCredentials, + **kwargs, + ) -> BlockOutput: + try: + # Create a new client instance + pc = Pinecone(api_key=credentials.api_key.get_secret_value()) + + # Get the index + idx = pc.Index(input_data.index) + + vectors = [] + for chunk, embedding in zip(input_data.chunks, input_data.embeddings): + vector_metadata = input_data.metadata.copy() + vector_metadata["text"] = chunk + vectors.append( + { + "id": str(uuid.uuid4()), + "values": embedding, + "metadata": vector_metadata, + } + ) + idx.upsert(vectors=vectors, namespace=input_data.namespace) + + yield "upsert_response", "successfully upserted" + + except Exception as e: + error_msg = f"Error uploading to Pinecone: {str(e)}" + raise RuntimeError(error_msg) from e