diff --git a/backend/onyx/background/celery/tasks/shared/RetryDocumentIndex.py b/backend/onyx/background/celery/tasks/shared/RetryDocumentIndex.py index 62a34196f60..34a3e0a8864 100644 --- a/backend/onyx/background/celery/tasks/shared/RetryDocumentIndex.py +++ b/backend/onyx/background/celery/tasks/shared/RetryDocumentIndex.py @@ -28,13 +28,35 @@ def __init__(self, index: DocumentIndex): wait=wait_random_exponential(multiplier=1, max=MAX_WAIT), stop=stop_after_delay(STOP_AFTER), ) - def delete_single(self, doc_id: str) -> int: - return self.index.delete_single(doc_id) + def delete_single( + self, + doc_id: str, + *, + tenant_id: str | None, + chunk_count: int | None, + ) -> int: + return self.index.delete_single( + doc_id, + tenant_id=tenant_id, + chunk_count=chunk_count, + ) @retry( retry=retry_if_exception_type(httpx.ReadTimeout), wait=wait_random_exponential(multiplier=1, max=MAX_WAIT), stop=stop_after_delay(STOP_AFTER), ) - def update_single(self, doc_id: str, fields: VespaDocumentFields) -> int: - return self.index.update_single(doc_id, fields) + def update_single( + self, + doc_id: str, + *, + tenant_id: str | None, + chunk_count: int | None, + fields: VespaDocumentFields, + ) -> int: + return self.index.update_single( + doc_id, + tenant_id=tenant_id, + chunk_count=chunk_count, + fields=fields, + ) diff --git a/backend/onyx/background/celery/tasks/shared/tasks.py b/backend/onyx/background/celery/tasks/shared/tasks.py index 03a7ab83701..b078c282b58 100644 --- a/backend/onyx/background/celery/tasks/shared/tasks.py +++ b/backend/onyx/background/celery/tasks/shared/tasks.py @@ -12,6 +12,7 @@ from onyx.configs.constants import OnyxCeleryTask from onyx.db.document import delete_document_by_connector_credential_pair__no_commit from onyx.db.document import delete_documents_complete__no_commit +from onyx.db.document import fetch_chunk_count_for_document from onyx.db.document import get_document from onyx.db.document import get_document_connector_count from onyx.db.document import mark_document_as_modified @@ -80,7 +81,13 @@ def document_by_cc_pair_cleanup_task( # delete it from vespa and the db action = "delete" - chunks_affected = retry_index.delete_single(document_id) + chunk_count = fetch_chunk_count_for_document(document_id, db_session) + + chunks_affected = retry_index.delete_single( + document_id, + tenant_id=tenant_id, + chunk_count=chunk_count, + ) delete_documents_complete__no_commit( db_session=db_session, document_ids=[document_id], @@ -110,7 +117,12 @@ def document_by_cc_pair_cleanup_task( ) # update Vespa. OK if doc doesn't exist. Raises exception otherwise. - chunks_affected = retry_index.update_single(document_id, fields=fields) + chunks_affected = retry_index.update_single( + document_id, + tenant_id=tenant_id, + chunk_count=doc.chunk_count, + fields=fields, + ) # there are still other cc_pair references to the doc, so just resync to Vespa delete_document_by_connector_credential_pair__no_commit( diff --git a/backend/onyx/background/celery/tasks/vespa/tasks.py b/backend/onyx/background/celery/tasks/vespa/tasks.py index c00bac354f3..8eabeb7d8a8 100644 --- a/backend/onyx/background/celery/tasks/vespa/tasks.py +++ b/backend/onyx/background/celery/tasks/vespa/tasks.py @@ -992,7 +992,12 @@ def vespa_metadata_sync_task( ) # update Vespa. OK if doc doesn't exist. Raises exception otherwise. - chunks_affected = retry_index.update_single(document_id, fields) + chunks_affected = retry_index.update_single( + document_id, + tenant_id=tenant_id, + chunk_count=doc.chunk_count, + fields=fields, + ) # update db last. Worst case = we crash right before this and # the sync might repeat again later diff --git a/backend/onyx/db/document.py b/backend/onyx/db/document.py index 7f11b64d824..d9ff82d797a 100644 --- a/backend/onyx/db/document.py +++ b/backend/onyx/db/document.py @@ -685,20 +685,27 @@ def get_document_sources( def fetch_chunk_counts_for_documents( document_ids: list[str], db_session: Session, -) -> list[tuple[str, int | None]]: +) -> list[tuple[str, int]]: """ Return a list of (document_id, chunk_count) tuples. - Note: chunk_count might be None if not set in DB, - so we declare it as Optional[int]. + If a document_id is not found in the database, it will be returned with a chunk_count of 0. """ stmt = select(DbDocument.id, DbDocument.chunk_count).where( DbDocument.id.in_(document_ids) ) - # results is a list of 'Row' objects, each containing two columns results = db_session.execute(stmt).all() - # If DbDocument.id is guaranteed to be a string, you can just do row.id; - # otherwise cast to str if you need to be sure it's a string: - return [(str(row[0]), row[1]) for row in results] - # or row.id, row.chunk_count if they are named attributes in your ORM model + # Create a dictionary of document_id to chunk_count + chunk_counts = {str(row.id): row.chunk_count or 0 for row in results} + + # Return a list of tuples, using 0 for documents not found in the database + return [(doc_id, chunk_counts.get(doc_id, 0)) for doc_id in document_ids] + + +def fetch_chunk_count_for_document( + document_id: str, + db_session: Session, +) -> int | None: + stmt = select(DbDocument.chunk_count).where(DbDocument.id == document_id) + return db_session.execute(stmt).scalar_one_or_none() diff --git a/backend/onyx/document_index/document_index_utils.py b/backend/onyx/document_index/document_index_utils.py index 8976b556eb2..831ffb6d403 100644 --- a/backend/onyx/document_index/document_index_utils.py +++ b/backend/onyx/document_index/document_index_utils.py @@ -8,7 +8,7 @@ from onyx.db.search_settings import get_secondary_search_settings from onyx.document_index.interfaces import EnrichedDocumentIndexingInfo from onyx.indexing.models import DocMetadataAwareIndexChunk - +from shared_configs.configs import MULTI_TENANT DEFAULT_BATCH_SIZE = 30 DEFAULT_INDEX_NAME = "danswer_chunk" @@ -37,7 +37,10 @@ def translate_boost_count_to_multiplier(boost: int) -> float: return 2 / (1 + math.exp(-1 * boost / 3)) -def assemble_document_chunk_info( +# Assembles a list of Vespa chunk IDs for a document +# given the required context. This can be used to directly query +# Vespa's Document API. +def get_document_chunk_ids( enriched_document_info_list: list[EnrichedDocumentIndexingInfo], tenant_id: str | None, large_chunks_enabled: bool, @@ -110,10 +113,11 @@ def get_uuid_from_chunk_info( "large_" + str(large_chunk_id) if large_chunk_id is not None else str(chunk_id) ) unique_identifier_string = "_".join([doc_str, chunk_index]) - if tenant_id: + if tenant_id and MULTI_TENANT: unique_identifier_string += "_" + tenant_id - return uuid.uuid5(uuid.NAMESPACE_X500, unique_identifier_string) + uuid_value = uuid.uuid5(uuid.NAMESPACE_X500, unique_identifier_string) + return uuid_value def get_uuid_from_chunk_info_old( diff --git a/backend/onyx/document_index/interfaces.py b/backend/onyx/document_index/interfaces.py index c97c87fd07e..08dbfdc9efb 100644 --- a/backend/onyx/document_index/interfaces.py +++ b/backend/onyx/document_index/interfaces.py @@ -109,7 +109,7 @@ class UpdateRequest: Does not update any of the None fields """ - document_ids: list[str] + minimal_document_indexing_info: list[MinimalDocumentIndexingInfo] # all other fields except these 4 will always be left alone by the update request access: DocumentAccess | None = None document_sets: set[str] | None = None @@ -136,7 +136,7 @@ def __init__( index_name: str, secondary_index_name: str | None, *args: Any, - **kwargs: Any + **kwargs: Any, ) -> None: super().__init__(*args, **kwargs) self.index_name = index_name @@ -218,7 +218,13 @@ class Deletable(abc.ABC): """ @abc.abstractmethod - def delete_single(self, doc_id: str) -> int: + def delete_single( + self, + doc_id: str, + *, + tenant_id: str | None, + chunk_count: int | None, + ) -> int: """ Given a single document id, hard delete it from the document index @@ -239,7 +245,14 @@ class Updatable(abc.ABC): """ @abc.abstractmethod - def update_single(self, doc_id: str, fields: VespaDocumentFields) -> int: + def update_single( + self, + doc_id: str, + *, + tenant_id: str | None, + chunk_count: int | None, + fields: VespaDocumentFields, + ) -> int: """ Updates all chunks for a document with the specified fields. None values mean that the field does not need an update. @@ -257,7 +270,9 @@ def update_single(self, doc_id: str, fields: VespaDocumentFields) -> int: raise NotImplementedError @abc.abstractmethod - def update(self, update_requests: list[UpdateRequest]) -> None: + def update( + self, update_requests: list[UpdateRequest], *, tenant_id: str | None + ) -> None: """ Updates some set of chunks. The document and fields to update are specified in the update requests. Each update request in the list applies its changes to a list of document ids. diff --git a/backend/onyx/document_index/vespa/index.py b/backend/onyx/document_index/vespa/index.py index 80e5504fa8e..ce2c580524f 100644 --- a/backend/onyx/document_index/vespa/index.py +++ b/backend/onyx/document_index/vespa/index.py @@ -13,11 +13,11 @@ from typing import BinaryIO from typing import cast from typing import List +from uuid import UUID import httpx # type: ignore import requests # type: ignore -from onyx.configs.app_configs import DOCUMENT_INDEX_NAME from onyx.configs.chat_configs import DOC_TIME_DECAY from onyx.configs.chat_configs import NUM_RETURNED_HITS from onyx.configs.chat_configs import TITLE_CONTENT_RATIO @@ -25,7 +25,8 @@ from onyx.configs.constants import KV_REINDEX_KEY from onyx.context.search.models import IndexFilters from onyx.context.search.models import InferenceChunkUncleaned -from onyx.document_index.document_index_utils import assemble_document_chunk_info +from onyx.db.engine import get_session_with_tenant +from onyx.document_index.document_index_utils import get_document_chunk_ids from onyx.document_index.interfaces import DocumentIndex from onyx.document_index.interfaces import DocumentInsertionRecord from onyx.document_index.interfaces import EnrichedDocumentIndexingInfo @@ -35,9 +36,6 @@ from onyx.document_index.interfaces import VespaChunkRequest from onyx.document_index.interfaces import VespaDocumentFields from onyx.document_index.vespa.chunk_retrieval import batch_search_api_retrieval -from onyx.document_index.vespa.chunk_retrieval import ( - get_all_vespa_ids_for_document_id, -) from onyx.document_index.vespa.chunk_retrieval import ( parallel_visit_api_retrieval, ) @@ -46,6 +44,9 @@ from onyx.document_index.vespa.indexing_utils import batch_index_vespa_chunks from onyx.document_index.vespa.indexing_utils import check_for_final_chunk_existence from onyx.document_index.vespa.indexing_utils import clean_chunk_id_copy +from onyx.document_index.vespa.indexing_utils import ( + get_multipass_config, +) from onyx.document_index.vespa.shared_utils.utils import get_vespa_http_client from onyx.document_index.vespa.shared_utils.utils import ( replace_invalid_doc_id_characters, @@ -337,40 +338,25 @@ def index( # documents that have `chunk_count` in the database, but not for # `old_version` documents. - enriched_doc_infos: list[EnrichedDocumentIndexingInfo] = [] - for document_id, doc_count in doc_id_to_previous_chunk_cnt.items(): - last_indexed_chunk = doc_id_to_previous_chunk_cnt.get(document_id, None) - # If the document has no `chunk_count` in the database, we know that it - # has the old chunk ID system and we must check for the final chunk index - is_old_version = False - if last_indexed_chunk is None: - is_old_version = True - minimal_doc_info = MinimalDocumentIndexingInfo( - doc_id=document_id, - chunk_start_index=doc_id_to_new_chunk_cnt.get(document_id, 0), - ) - last_indexed_chunk = check_for_final_chunk_existence( - minimal_doc_info=minimal_doc_info, - start_index=doc_id_to_new_chunk_cnt[document_id], - index_name=self.index_name, - http_client=http_client, - ) + enriched_doc_infos: list[EnrichedDocumentIndexingInfo] = [ + VespaIndex.enrich_basic_chunk_info( + index_name=self.index_name, + http_client=http_client, + document_id=doc_id, + previous_chunk_count=doc_id_to_previous_chunk_cnt.get(doc_id, 0), + new_chunk_count=doc_id_to_new_chunk_cnt.get(doc_id, 0), + ) + for doc_id in doc_id_to_new_chunk_cnt.keys() + ] + for cleaned_doc_info in enriched_doc_infos: # If the document has previously indexed chunks, we know it previously existed - if doc_count or last_indexed_chunk: - existing_docs.add(document_id) - - enriched_doc_info = EnrichedDocumentIndexingInfo( - doc_id=document_id, - chunk_start_index=doc_id_to_new_chunk_cnt.get(document_id, 0), - chunk_end_index=last_indexed_chunk, - old_version=is_old_version, - ) - enriched_doc_infos.append(enriched_doc_info) + if cleaned_doc_info.chunk_end_index: + existing_docs.add(cleaned_doc_info.doc_id) # Now, for each doc, we know exactly where to start and end our deletion # So let's generate the chunk IDs for each chunk to delete - chunks_to_delete = assemble_document_chunk_info( + chunks_to_delete = get_document_chunk_ids( enriched_document_info_list=enriched_doc_infos, tenant_id=tenant_id, large_chunks_enabled=large_chunks_enabled, @@ -447,21 +433,21 @@ def _update_chunk( failure_msg = f"Failed to update document: {future_to_document_id[future]}" raise requests.HTTPError(failure_msg) from e - def update(self, update_requests: list[UpdateRequest]) -> None: + def update( + self, update_requests: list[UpdateRequest], *, tenant_id: str | None + ) -> None: logger.debug(f"Updating {len(update_requests)} documents in Vespa") # Handle Vespa character limitations # Mutating update_requests but it's not used later anyway for update_request in update_requests: - update_request.document_ids = [ - replace_invalid_doc_id_characters(doc_id) - for doc_id in update_request.document_ids - ] + for doc_info in update_request.minimal_document_indexing_info: + doc_info.doc_id = replace_invalid_doc_id_characters(doc_info.doc_id) update_start = time.monotonic() processed_updates_requests: list[_VespaUpdateRequest] = [] - all_doc_chunk_ids: dict[str, list[str]] = {} + all_doc_chunk_ids: dict[str, list[UUID]] = {} # Fetch all chunks for each document ahead of time index_names = [self.index_name] @@ -469,30 +455,24 @@ def update(self, update_requests: list[UpdateRequest]) -> None: index_names.append(self.secondary_index_name) chunk_id_start_time = time.monotonic() - with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor: - future_to_doc_chunk_ids = { - executor.submit( - get_all_vespa_ids_for_document_id, - document_id=document_id, - index_name=index_name, - filters=None, - get_large_chunks=True, - ): (document_id, index_name) - for index_name in index_names - for update_request in update_requests - for document_id in update_request.document_ids - } - for future in concurrent.futures.as_completed(future_to_doc_chunk_ids): - document_id, index_name = future_to_doc_chunk_ids[future] - try: - doc_chunk_ids = future.result() - if document_id not in all_doc_chunk_ids: - all_doc_chunk_ids[document_id] = [] - all_doc_chunk_ids[document_id].extend(doc_chunk_ids) - except Exception as e: - logger.error( - f"Error retrieving chunk IDs for document {document_id} in index {index_name}: {e}" - ) + with get_vespa_http_client() as http_client: + for update_request in update_requests: + for doc_info in update_request.minimal_document_indexing_info: + for index_name in index_names: + doc_chunk_info = VespaIndex.enrich_basic_chunk_info( + index_name=index_name, + http_client=http_client, + document_id=doc_info.doc_id, + previous_chunk_count=doc_info.chunk_start_index, + new_chunk_count=0, + ) + doc_chunk_ids = get_document_chunk_ids( + enriched_document_info_list=[doc_chunk_info], + tenant_id=tenant_id, + large_chunks_enabled=False, + ) + all_doc_chunk_ids[doc_info.doc_id] = doc_chunk_ids + logger.debug( f"Took {time.monotonic() - chunk_id_start_time:.2f} seconds to fetch all Vespa chunk IDs" ) @@ -521,11 +501,11 @@ def update(self, update_requests: list[UpdateRequest]) -> None: logger.error("Update request received but nothing to update") continue - for document_id in update_request.document_ids: - for doc_chunk_id in all_doc_chunk_ids[document_id]: + for doc_info in update_request.minimal_document_indexing_info: + for doc_chunk_id in all_doc_chunk_ids[doc_info.doc_id]: processed_updates_requests.append( _VespaUpdateRequest( - document_id=document_id, + document_id=doc_info.doc_id, url=f"{DOCUMENT_ID_ENDPOINT.format(index_name=self.index_name)}/{doc_chunk_id}", update_request=update_dict, ) @@ -537,103 +517,122 @@ def update(self, update_requests: list[UpdateRequest]) -> None: time.monotonic() - update_start, ) - def update_single(self, doc_id: str, fields: VespaDocumentFields) -> int: - """Note: if the document id does not exist, the update will be a no-op and the - function will complete with no errors or exceptions. - Handle other exceptions if you wish to implement retry behavior + def update_single_chunk( + self, + doc_chunk_id: UUID, + index_name: str, + fields: VespaDocumentFields, + doc_id: str, + ) -> None: + """ + Update a single "chunk" (document) in Vespa using its chunk ID. """ - total_chunks_updated = 0 - - # Handle Vespa character limitations - # Mutating update_request but it's not used later anyway - normalized_doc_id = replace_invalid_doc_id_characters(doc_id) - - # Build the _VespaUpdateRequest objects update_dict: dict[str, dict] = {"fields": {}} + if fields.boost is not None: update_dict["fields"][BOOST] = {"assign": fields.boost} + if fields.document_sets is not None: + # WeightedSet needs a map { item: weight, ... } update_dict["fields"][DOCUMENT_SETS] = { "assign": {document_set: 1 for document_set in fields.document_sets} } + if fields.access is not None: + # Similar to above update_dict["fields"][ACCESS_CONTROL_LIST] = { "assign": {acl_entry: 1 for acl_entry in fields.access.to_acl()} } + if fields.hidden is not None: update_dict["fields"][HIDDEN] = {"assign": fields.hidden} if not update_dict["fields"]: - logger.error("Update request received but nothing to update") - return 0 + logger.error("Update request received but nothing to update.") + return - index_names = [self.index_name] - if self.secondary_index_name: - index_names.append(self.secondary_index_name) + vespa_url = f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{doc_chunk_id}?create=true" with get_vespa_http_client(http2=False) as http_client: - for index_name in index_names: - params = httpx.QueryParams( - { - "selection": f"{index_name}.document_id=='{normalized_doc_id}'", - "cluster": DOCUMENT_INDEX_NAME, - } + try: + resp = http_client.put( + vespa_url, + headers={"Content-Type": "application/json"}, + json=update_dict, ) + resp.raise_for_status() + except httpx.HTTPStatusError as e: + error_message = f"Failed to update doc chunk {doc_chunk_id} (doc_id={doc_id}). Details: {e.response.text}" + logger.error(error_message) + raise - while True: - try: - vespa_url = ( - f"{DOCUMENT_ID_ENDPOINT.format(index_name=self.index_name)}" - ) - logger.debug(f'update_single PUT on URL "{vespa_url}"') - resp = http_client.put( - vespa_url, - params=params, - headers={"Content-Type": "application/json"}, - json=update_dict, - ) - - resp.raise_for_status() - except httpx.HTTPStatusError as e: - logger.error( - f"Failed to update chunks, details: {e.response.text}" - ) - raise + def update_single( + self, + doc_id: str, + *, + chunk_count: int | None, + tenant_id: str | None, + fields: VespaDocumentFields, + ) -> int: + """Note: if the document id does not exist, the update will be a no-op and the + function will complete with no errors or exceptions. + Handle other exceptions if you wish to implement retry behavior + """ - resp_data = resp.json() + doc_chunk_count = 0 - if "documentCount" in resp_data: - chunks_updated = resp_data["documentCount"] - total_chunks_updated += chunks_updated + index_names = [self.index_name] + if self.secondary_index_name: + index_names.append(self.secondary_index_name) - # Check for continuation token to handle pagination - if "continuation" not in resp_data: - break # Exit loop if no continuation token + with get_vespa_http_client(http2=False) as http_client: + for index_name in index_names: + with get_session_with_tenant(tenant_id=tenant_id) as db_session: + multipass_config = get_multipass_config( + db_session=db_session, + primary_index=index_name == self.index_name, + ) + large_chunks_enabled = multipass_config.enable_large_chunks + enriched_doc_infos = VespaIndex.enrich_basic_chunk_info( + index_name=index_name, + http_client=http_client, + document_id=doc_id, + previous_chunk_count=chunk_count, + new_chunk_count=0, + ) - if not resp_data["continuation"]: - break # Exit loop if continuation token is empty + doc_chunk_ids = get_document_chunk_ids( + enriched_document_info_list=[enriched_doc_infos], + tenant_id=tenant_id, + large_chunks_enabled=large_chunks_enabled, + ) - params = params.set("continuation", resp_data["continuation"]) + logger.info("UPDATING len(doc_chunk_ids)") - logger.debug( - f"VespaIndex.update_single: " - f"index={index_name} " - f"doc={normalized_doc_id} " - f"chunks_updated={total_chunks_updated}" - ) + doc_chunk_count += len(doc_chunk_ids) - return total_chunks_updated + for doc_chunk_id in doc_chunk_ids: + logger.info("UPDATING CHUNK") + self.update_single_chunk( + doc_chunk_id=doc_chunk_id, + index_name=index_name, + fields=fields, + doc_id=doc_id, + ) + logger.info(f"UPDATED A TOTAL OF {doc_chunk_count} CHUNKS for {doc_id}") - def delete_single(self, doc_id: str) -> int: - """Possibly faster overall than the delete method due to using a single - delete call with a selection query.""" + return doc_chunk_count + def delete_single( + self, + doc_id: str, + *, + tenant_id: str | None, + chunk_count: int | None, + ) -> int: total_chunks_deleted = 0 - # Vespa deletion is poorly documented ... luckily we found this - # https://docs.vespa.ai/en/operations/batch-delete.html#example - doc_id = replace_invalid_doc_id_characters(doc_id) # NOTE: using `httpx` here since `requests` doesn't support HTTP2. This is beneficial for @@ -642,53 +641,41 @@ def delete_single(self, doc_id: str) -> int: if self.secondary_index_name: index_names.append(self.secondary_index_name) - with get_vespa_http_client(http2=False) as http_client: + with get_vespa_http_client( + http2=False + ) as http_client, concurrent.futures.ThreadPoolExecutor( + max_workers=NUM_THREADS + ) as executor: for index_name in index_names: - params = httpx.QueryParams( - { - "selection": f"{index_name}.document_id=='{doc_id}'", - "cluster": DOCUMENT_INDEX_NAME, - } - ) - - while True: - try: - vespa_url = ( - f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}" - ) - logger.debug(f'delete_single DELETE on URL "{vespa_url}"') - resp = http_client.delete( - vespa_url, - params=params, - ) - resp.raise_for_status() - except httpx.HTTPStatusError as e: - logger.error( - f"Failed to delete chunk, details: {e.response.text}" - ) - raise - - resp_data = resp.json() - - if "documentCount" in resp_data: - chunks_deleted = resp_data["documentCount"] - total_chunks_deleted += chunks_deleted - - # Check for continuation token to handle pagination - if "continuation" not in resp_data: - break # Exit loop if no continuation token - - if not resp_data["continuation"]: - break # Exit loop if continuation token is empty - - params = params.set("continuation", resp_data["continuation"]) + with get_session_with_tenant(tenant_id=tenant_id) as db_session: + multipass_config = get_multipass_config( + db_session=db_session, + primary_index=index_name == self.index_name, + ) + large_chunks_enabled = multipass_config.enable_large_chunks - logger.debug( - f"VespaIndex.delete_single: " - f"index={index_name} " - f"doc={doc_id} " - f"chunks_deleted={total_chunks_deleted}" + enriched_doc_infos = VespaIndex.enrich_basic_chunk_info( + index_name=index_name, + http_client=http_client, + document_id=doc_id, + previous_chunk_count=chunk_count, + new_chunk_count=0, ) + chunks_to_delete = get_document_chunk_ids( + enriched_document_info_list=[enriched_doc_infos], + tenant_id=tenant_id, + large_chunks_enabled=large_chunks_enabled, + ) + for doc_chunk_ids_batch in batch_generator( + chunks_to_delete, BATCH_SIZE + ): + total_chunks_deleted += len(doc_chunk_ids_batch) + delete_vespa_chunks( + doc_chunk_ids=doc_chunk_ids_batch, + index_name=index_name, + http_client=http_client, + executor=executor, + ) return total_chunks_deleted @@ -787,8 +774,51 @@ def admin_retrieval( return query_vespa(params) + # Retrieves chunk information for a document: + # - Determines the last indexed chunk + # - Identifies if the document uses the old or new chunk ID system + # This data is crucial for Vespa document updates without relying on the visit API. @classmethod - def delete_entries_by_tenant_id(cls, tenant_id: str, index_name: str) -> None: + def enrich_basic_chunk_info( + cls, + index_name: str, + http_client: httpx.Client, + document_id: str, + previous_chunk_count: int | None = None, + new_chunk_count: int = 0, + ) -> EnrichedDocumentIndexingInfo: + last_indexed_chunk = previous_chunk_count + + # If the document has no `chunk_count` in the database, we know that it + # has the old chunk ID system and we must check for the final chunk index + is_old_version = False + if last_indexed_chunk is None: + is_old_version = True + minimal_doc_info = MinimalDocumentIndexingInfo( + doc_id=document_id, chunk_start_index=new_chunk_count + ) + last_indexed_chunk = check_for_final_chunk_existence( + minimal_doc_info=minimal_doc_info, + start_index=new_chunk_count, + index_name=index_name, + http_client=http_client, + ) + + enriched_doc_info = EnrichedDocumentIndexingInfo( + doc_id=document_id, + chunk_start_index=new_chunk_count, + chunk_end_index=last_indexed_chunk, + old_version=is_old_version, + ) + return enriched_doc_info + + @classmethod + def delete_entries_by_tenant_id( + cls, + *, + tenant_id: str, + index_name: str, + ) -> None: """ Deletes all entries in the specified index with the given tenant_id. diff --git a/backend/onyx/document_index/vespa/indexing_utils.py b/backend/onyx/document_index/vespa/indexing_utils.py index 6781cae1d93..ed802ada9ac 100644 --- a/backend/onyx/document_index/vespa/indexing_utils.py +++ b/backend/onyx/document_index/vespa/indexing_utils.py @@ -7,10 +7,15 @@ import httpx from retry import retry +from sqlalchemy.orm import Session +from onyx.configs.app_configs import ENABLE_MULTIPASS_INDEXING from onyx.connectors.cross_connector_utils.miscellaneous_utils import ( get_experts_stores_representations, ) +from onyx.db.models import SearchSettings +from onyx.db.search_settings import get_current_search_settings +from onyx.db.search_settings import get_secondary_search_settings from onyx.document_index.document_index_utils import get_uuid_from_chunk from onyx.document_index.document_index_utils import get_uuid_from_chunk_info_old from onyx.document_index.interfaces import MinimalDocumentIndexingInfo @@ -45,6 +50,8 @@ from onyx.document_index.vespa_constants import TITLE from onyx.document_index.vespa_constants import TITLE_EMBEDDING from onyx.indexing.models import DocMetadataAwareIndexChunk +from onyx.indexing.models import EmbeddingProvider +from onyx.indexing.models import MultipassConfig from onyx.utils.logger import setup_logger logger = setup_logger() @@ -129,7 +136,9 @@ def _index_vespa_chunk( document = chunk.source_document # No minichunk documents in vespa, minichunk vectors are stored in the chunk itself + vespa_chunk_id = str(get_uuid_from_chunk(chunk)) + embeddings = chunk.embeddings embeddings_name_vector_map = {"full_chunk": embeddings.full_embedding} @@ -263,5 +272,49 @@ def check_for_final_chunk_existence( ) if not _does_doc_chunk_exist(doc_chunk_id, index_name, http_client): return index - index += 1 + + +def should_use_multipass(search_settings: SearchSettings | None) -> bool: + """ + Determines whether multipass should be used based on the search settings + or the default config if settings are unavailable. + """ + if search_settings is not None: + return search_settings.multipass_indexing + return ENABLE_MULTIPASS_INDEXING + + +def can_use_large_chunks(multipass: bool, search_settings: SearchSettings) -> bool: + """ + Given multipass usage and an embedder, decides whether large chunks are allowed + based on model/provider constraints. + """ + # Only local models that support a larger context are from Nomic + # Cohere does not support larger contexts (they recommend not going above ~512 tokens) + return ( + multipass + and search_settings.model_name.startswith("nomic-ai") + and search_settings.provider_type != EmbeddingProvider.COHERE + ) + + +def get_multipass_config( + db_session: Session, primary_index: bool = True +) -> MultipassConfig: + """ + Determines whether to enable multipass and large chunks by examining + the current search settings and the embedder configuration. + """ + search_settings = ( + get_current_search_settings(db_session) + if primary_index + else get_secondary_search_settings(db_session) + ) + multipass = should_use_multipass(search_settings) + if not search_settings: + return MultipassConfig(multipass_indexing=False, enable_large_chunks=False) + enable_large_chunks = can_use_large_chunks(multipass, search_settings) + return MultipassConfig( + multipass_indexing=multipass, enable_large_chunks=enable_large_chunks + ) diff --git a/backend/onyx/indexing/indexing_pipeline.py b/backend/onyx/indexing/indexing_pipeline.py index 1a2e73b2ab1..74c6c08fd8a 100644 --- a/backend/onyx/indexing/indexing_pipeline.py +++ b/backend/onyx/indexing/indexing_pipeline.py @@ -11,7 +11,6 @@ from onyx.access.access import get_access_for_documents from onyx.access.models import DocumentAccess -from onyx.configs.app_configs import ENABLE_MULTIPASS_INDEXING from onyx.configs.app_configs import INDEXING_EXCEPTION_LIMIT from onyx.configs.app_configs import MAX_DOCUMENT_CHARS from onyx.configs.constants import DEFAULT_BOOST @@ -31,12 +30,14 @@ from onyx.db.document_set import fetch_document_sets_for_documents from onyx.db.index_attempt import create_index_attempt_error from onyx.db.models import Document as DBDocument -from onyx.db.search_settings import get_current_search_settings from onyx.db.tag import create_or_add_document_tag from onyx.db.tag import create_or_add_document_tag_list from onyx.document_index.interfaces import DocumentIndex from onyx.document_index.interfaces import DocumentMetadata from onyx.document_index.interfaces import IndexBatchParams +from onyx.document_index.vespa.indexing_utils import ( + get_multipass_config, +) from onyx.indexing.chunker import Chunker from onyx.indexing.embedder import IndexingEmbedder from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface @@ -44,7 +45,6 @@ from onyx.indexing.models import DocMetadataAwareIndexChunk from onyx.utils.logger import setup_logger from onyx.utils.timing import log_function_time -from shared_configs.enums import EmbeddingProvider logger = setup_logger() @@ -479,28 +479,6 @@ def index_doc_batch( return result -def check_enable_large_chunks_and_multipass( - embedder: IndexingEmbedder, db_session: Session -) -> tuple[bool, bool]: - search_settings = get_current_search_settings(db_session) - multipass = ( - search_settings.multipass_indexing - if search_settings - else ENABLE_MULTIPASS_INDEXING - ) - - enable_large_chunks = ( - multipass - and - # Only local models that supports larger context are from Nomic - (embedder.model_name.startswith("nomic-ai")) - and - # Cohere does not support larger context they recommend not going above 512 tokens - embedder.provider_type != EmbeddingProvider.COHERE - ) - return multipass, enable_large_chunks - - def build_indexing_pipeline( *, embedder: IndexingEmbedder, @@ -513,14 +491,12 @@ def build_indexing_pipeline( callback: IndexingHeartbeatInterface | None = None, ) -> IndexingPipelineProtocol: """Builds a pipeline which takes in a list (batch) of docs and indexes them.""" - multipass, enable_large_chunks = check_enable_large_chunks_and_multipass( - embedder, db_session - ) + multipass_config = get_multipass_config(db_session, primary_index=True) chunker = chunker or Chunker( tokenizer=embedder.embedding_model.tokenizer, - enable_multipass=multipass, - enable_large_chunks=enable_large_chunks, + enable_multipass=multipass_config.multipass_indexing, + enable_large_chunks=multipass_config.enable_large_chunks, # after every doc, update status in case there are a bunch of really long docs callback=callback, ) diff --git a/backend/onyx/indexing/models.py b/backend/onyx/indexing/models.py index e536428282b..44a8419cb95 100644 --- a/backend/onyx/indexing/models.py +++ b/backend/onyx/indexing/models.py @@ -154,3 +154,8 @@ def from_db_model(cls, search_settings: "SearchSettings") -> "IndexingSetting": index_name=search_settings.index_name, multipass_indexing=search_settings.multipass_indexing, ) + + +class MultipassConfig(BaseModel): + multipass_indexing: bool + enable_large_chunks: bool diff --git a/backend/scripts/force_delete_connector_by_id.py b/backend/scripts/force_delete_connector_by_id.py index bc994238870..39b98b9bcb8 100755 --- a/backend/scripts/force_delete_connector_by_id.py +++ b/backend/scripts/force_delete_connector_by_id.py @@ -5,6 +5,7 @@ from sqlalchemy import delete from sqlalchemy.orm import Session +from onyx.db.document import delete_documents_complete__no_commit from onyx.db.enums import ConnectorCredentialPairStatus # Modify sys.path @@ -38,7 +39,6 @@ from onyx.document_index.factory import get_default_document_index from onyx.file_store.file_store import get_default_file_store from onyx.document_index.document_index_utils import get_both_index_names -from onyx.db.document import delete_documents_complete__no_commit # pylint: enable=E402 # flake8: noqa: E402 @@ -71,13 +71,16 @@ def _unsafe_deletion( if not documents: break - document_ids = [document.id for document in documents] - for doc_id in document_ids: - document_index.delete_single(doc_id) + for document in documents: + document_index.delete_single( + doc_id=document.id, + tenant_id=None, + chunk_count=document.chunk_count, + ) delete_documents_complete__no_commit( db_session=db_session, - document_ids=document_ids, + document_ids=[document.id for document in documents], ) num_docs_deleted += len(documents) @@ -216,6 +219,7 @@ def _delete_connector(cc_pair_id: int, db_session: Session) -> None: parser.add_argument( "connector_id", type=int, help="The ID of the connector to delete" ) + args = parser.parse_args() with get_session_context_manager() as db_session: _delete_connector(args.connector_id, db_session) diff --git a/backend/scripts/orphan_doc_cleanup_script.py b/backend/scripts/orphan_doc_cleanup_script.py index ea776e770eb..c138bdc64a6 100644 --- a/backend/scripts/orphan_doc_cleanup_script.py +++ b/backend/scripts/orphan_doc_cleanup_script.py @@ -15,6 +15,7 @@ from onyx.db.document import delete_documents_complete__no_commit # noqa: E402 from onyx.db.search_settings import get_current_search_settings # noqa: E402 from onyx.document_index.vespa.index import VespaIndex # noqa: E402 +from onyx.db.document import get_document # noqa: E402 BATCH_SIZE = 100 @@ -63,6 +64,9 @@ def main() -> None: with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor: def process_doc(doc_id: str) -> str | None: + document = get_document(doc_id, db_session) + if not document: + return None # Check if document exists in Vespa first try: chunks = vespa_index.id_based_retrieval( @@ -83,7 +87,9 @@ def process_doc(doc_id: str) -> str | None: try: print(f"Deleting document {doc_id} in Vespa") - chunks_deleted = vespa_index.delete_single(doc_id) + chunks_deleted = vespa_index.delete_single( + doc_id, tenant_id=None, chunk_count=document.chunk_count + ) if chunks_deleted > 0: print( f"Deleted {chunks_deleted} chunks for document {doc_id}" diff --git a/backend/tests/__init__.py b/backend/tests/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/backend/tests/integration/__init__.py b/backend/tests/integration/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/backend/tests/integration/common_utils/managers/document.py b/backend/tests/integration/common_utils/managers/document.py index 939a420f014..9ce3430fe62 100644 --- a/backend/tests/integration/common_utils/managers/document.py +++ b/backend/tests/integration/common_utils/managers/document.py @@ -21,8 +21,9 @@ def _verify_document_permissions( group_names: list[str] | None = None, doc_creating_user: DATestUser | None = None, ) -> None: - acl_keys = set(retrieved_doc["access_control_list"].keys()) + acl_keys = set(retrieved_doc.get("access_control_list", {}).keys()) print(f"ACL keys: {acl_keys}") + if cc_pair.access_type == AccessType.PUBLIC: if "PUBLIC" not in acl_keys: raise ValueError( @@ -42,8 +43,9 @@ def _verify_document_permissions( found_group_keys = {key for key in acl_keys if key.startswith("group:")} if found_group_keys != expected_group_keys: raise ValueError( - f"Document {retrieved_doc['document_id']} has incorrect group ACL keys. Found: {found_group_keys}, \n" - f"Expected: {expected_group_keys}" + f"Document {retrieved_doc['document_id']} has incorrect group ACL keys. " + f"Expected: {expected_group_keys} Found: {found_group_keys}\n" + f"All ACL keys: {acl_keys}" ) if doc_set_names is not None: @@ -153,9 +155,11 @@ def verify( ) -> None: doc_ids = [document.id for document in cc_pair.documents] retrieved_docs_dict = vespa_client.get_documents_by_id(doc_ids)["documents"] + retrieved_docs = { doc["fields"]["document_id"]: doc["fields"] for doc in retrieved_docs_dict } + # Left this here for debugging purposes. # import json # for doc in retrieved_docs.values():