Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fully remove visit API #3621

Merged
merged 11 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,35 @@ def __init__(self, index: DocumentIndex):
wait=wait_random_exponential(multiplier=1, max=MAX_WAIT),
stop=stop_after_delay(STOP_AFTER),
)
def delete_single(self, doc_id: str) -> int:
return self.index.delete_single(doc_id)
def delete_single(
self,
doc_id: str,
*,
tenant_id: str | None,
chunk_count: int | None,
) -> int:
return self.index.delete_single(
doc_id,
tenant_id=tenant_id,
chunk_count=chunk_count,
)

@retry(
retry=retry_if_exception_type(httpx.ReadTimeout),
wait=wait_random_exponential(multiplier=1, max=MAX_WAIT),
stop=stop_after_delay(STOP_AFTER),
)
def update_single(self, doc_id: str, fields: VespaDocumentFields) -> int:
return self.index.update_single(doc_id, fields)
def update_single(
self,
doc_id: str,
*,
tenant_id: str | None,
chunk_count: int | None,
fields: VespaDocumentFields,
) -> int:
return self.index.update_single(
doc_id,
tenant_id=tenant_id,
chunk_count=chunk_count,
fields=fields,
)
16 changes: 14 additions & 2 deletions backend/onyx/background/celery/tasks/shared/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from onyx.configs.constants import OnyxCeleryTask
from onyx.db.document import delete_document_by_connector_credential_pair__no_commit
from onyx.db.document import delete_documents_complete__no_commit
from onyx.db.document import fetch_chunk_count_for_document
from onyx.db.document import get_document
from onyx.db.document import get_document_connector_count
from onyx.db.document import mark_document_as_modified
Expand Down Expand Up @@ -80,7 +81,13 @@ def document_by_cc_pair_cleanup_task(
# delete it from vespa and the db
action = "delete"

chunks_affected = retry_index.delete_single(document_id)
chunk_count = fetch_chunk_count_for_document(document_id, db_session)

chunks_affected = retry_index.delete_single(
document_id,
tenant_id=tenant_id,
chunk_count=chunk_count,
)
delete_documents_complete__no_commit(
db_session=db_session,
document_ids=[document_id],
Expand Down Expand Up @@ -110,7 +117,12 @@ def document_by_cc_pair_cleanup_task(
)

# update Vespa. OK if doc doesn't exist. Raises exception otherwise.
chunks_affected = retry_index.update_single(document_id, fields=fields)
chunks_affected = retry_index.update_single(
document_id,
tenant_id=tenant_id,
chunk_count=doc.chunk_count,
fields=fields,
)

# there are still other cc_pair references to the doc, so just resync to Vespa
delete_document_by_connector_credential_pair__no_commit(
Expand Down
7 changes: 6 additions & 1 deletion backend/onyx/background/celery/tasks/vespa/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -992,7 +992,12 @@ def vespa_metadata_sync_task(
)

# update Vespa. OK if doc doesn't exist. Raises exception otherwise.
chunks_affected = retry_index.update_single(document_id, fields)
chunks_affected = retry_index.update_single(
document_id,
tenant_id=tenant_id,
chunk_count=doc.chunk_count,
fields=fields,
)

# update db last. Worst case = we crash right before this and
# the sync might repeat again later
Expand Down
23 changes: 15 additions & 8 deletions backend/onyx/db/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -685,20 +685,27 @@ def get_document_sources(
def fetch_chunk_counts_for_documents(
document_ids: list[str],
db_session: Session,
) -> list[tuple[str, int | None]]:
) -> list[tuple[str, int]]:
"""
Return a list of (document_id, chunk_count) tuples.
Note: chunk_count might be None if not set in DB,
so we declare it as Optional[int].
If a document_id is not found in the database, it will be returned with a chunk_count of 0.
"""
stmt = select(DbDocument.id, DbDocument.chunk_count).where(
DbDocument.id.in_(document_ids)
)

# results is a list of 'Row' objects, each containing two columns
results = db_session.execute(stmt).all()

# If DbDocument.id is guaranteed to be a string, you can just do row.id;
# otherwise cast to str if you need to be sure it's a string:
return [(str(row[0]), row[1]) for row in results]
# or row.id, row.chunk_count if they are named attributes in your ORM model
# Create a dictionary of document_id to chunk_count
chunk_counts = {str(row.id): row.chunk_count or 0 for row in results}

# Return a list of tuples, using 0 for documents not found in the database
return [(doc_id, chunk_counts.get(doc_id, 0)) for doc_id in document_ids]


def fetch_chunk_count_for_document(
document_id: str,
db_session: Session,
) -> int | None:
stmt = select(DbDocument.chunk_count).where(DbDocument.id == document_id)
return db_session.execute(stmt).scalar_one_or_none()
12 changes: 8 additions & 4 deletions backend/onyx/document_index/document_index_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from onyx.db.search_settings import get_secondary_search_settings
from onyx.document_index.interfaces import EnrichedDocumentIndexingInfo
from onyx.indexing.models import DocMetadataAwareIndexChunk

from shared_configs.configs import MULTI_TENANT

DEFAULT_BATCH_SIZE = 30
DEFAULT_INDEX_NAME = "danswer_chunk"
Expand Down Expand Up @@ -37,7 +37,10 @@ def translate_boost_count_to_multiplier(boost: int) -> float:
return 2 / (1 + math.exp(-1 * boost / 3))


def assemble_document_chunk_info(
# Assembles a list of Vespa chunk IDs for a document
# given the required context. This can be used to directly query
# Vespa's Document API.
def get_document_chunk_ids(
enriched_document_info_list: list[EnrichedDocumentIndexingInfo],
tenant_id: str | None,
large_chunks_enabled: bool,
Expand Down Expand Up @@ -110,10 +113,11 @@ def get_uuid_from_chunk_info(
"large_" + str(large_chunk_id) if large_chunk_id is not None else str(chunk_id)
)
unique_identifier_string = "_".join([doc_str, chunk_index])
if tenant_id:
if tenant_id and MULTI_TENANT:
unique_identifier_string += "_" + tenant_id

return uuid.uuid5(uuid.NAMESPACE_X500, unique_identifier_string)
uuid_value = uuid.uuid5(uuid.NAMESPACE_X500, unique_identifier_string)
return uuid_value


def get_uuid_from_chunk_info_old(
Expand Down
25 changes: 20 additions & 5 deletions backend/onyx/document_index/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class UpdateRequest:
Does not update any of the None fields
"""

document_ids: list[str]
minimal_document_indexing_info: list[MinimalDocumentIndexingInfo]
# all other fields except these 4 will always be left alone by the update request
access: DocumentAccess | None = None
document_sets: set[str] | None = None
Expand All @@ -136,7 +136,7 @@ def __init__(
index_name: str,
secondary_index_name: str | None,
*args: Any,
**kwargs: Any
**kwargs: Any,
) -> None:
super().__init__(*args, **kwargs)
self.index_name = index_name
Expand Down Expand Up @@ -218,7 +218,13 @@ class Deletable(abc.ABC):
"""

@abc.abstractmethod
def delete_single(self, doc_id: str) -> int:
def delete_single(
self,
doc_id: str,
*,
tenant_id: str | None,
chunk_count: int | None,
) -> int:
"""
Given a single document id, hard delete it from the document index
Expand All @@ -239,7 +245,14 @@ class Updatable(abc.ABC):
"""

@abc.abstractmethod
def update_single(self, doc_id: str, fields: VespaDocumentFields) -> int:
def update_single(
self,
doc_id: str,
*,
tenant_id: str | None,
chunk_count: int | None,
fields: VespaDocumentFields,
) -> int:
"""
Updates all chunks for a document with the specified fields.
None values mean that the field does not need an update.
Expand All @@ -257,7 +270,9 @@ def update_single(self, doc_id: str, fields: VespaDocumentFields) -> int:
raise NotImplementedError

@abc.abstractmethod
def update(self, update_requests: list[UpdateRequest]) -> None:
def update(
self, update_requests: list[UpdateRequest], *, tenant_id: str | None
) -> None:
"""
Updates some set of chunks. The document and fields to update are specified in the update
requests. Each update request in the list applies its changes to a list of document ids.
Expand Down
Loading
Loading