From 97a963b4bf2d992639f8b29fbb1df1dd2b9f6566 Mon Sep 17 00:00:00 2001 From: rkuo-danswer Date: Wed, 8 Jan 2025 16:56:55 -0800 Subject: [PATCH 01/29] add index to speed up get last attempt (#3636) * add index to speed up get last attempt * use descending order * put back unique param * how did this not get formatted? --------- Co-authored-by: Richard Kuo (Danswer) --- ..._add_composite_index_for_index_attempt_.py | 35 +++++++++++++++++++ backend/onyx/db/models.py | 8 +++++ 2 files changed, 43 insertions(+) create mode 100644 backend/alembic/versions/369644546676_add_composite_index_for_index_attempt_.py diff --git a/backend/alembic/versions/369644546676_add_composite_index_for_index_attempt_.py b/backend/alembic/versions/369644546676_add_composite_index_for_index_attempt_.py new file mode 100644 index 00000000000..4e0384fe486 --- /dev/null +++ b/backend/alembic/versions/369644546676_add_composite_index_for_index_attempt_.py @@ -0,0 +1,35 @@ +"""add composite index for index attempt time updated + +Revision ID: 369644546676 +Revises: 2955778aa44c +Create Date: 2025-01-08 15:38:17.224380 + +""" +from alembic import op +from sqlalchemy import text + +# revision identifiers, used by Alembic. +revision = "369644546676" +down_revision = "2955778aa44c" +branch_labels: None = None +depends_on: None = None + + +def upgrade() -> None: + op.create_index( + "ix_index_attempt_ccpair_search_settings_time_updated", + "index_attempt", + [ + "connector_credential_pair_id", + "search_settings_id", + text("time_updated DESC"), + ], + unique=False, + ) + + +def downgrade() -> None: + op.drop_index( + "ix_index_attempt_ccpair_search_settings_time_updated", + table_name="index_attempt", + ) diff --git a/backend/onyx/db/models.py b/backend/onyx/db/models.py index 87370e91f67..ff1c98d13d8 100644 --- a/backend/onyx/db/models.py +++ b/backend/onyx/db/models.py @@ -18,6 +18,7 @@ from fastapi_users_db_sqlalchemy.generics import TIMESTAMPAware from sqlalchemy import Boolean from sqlalchemy import DateTime +from sqlalchemy import desc from sqlalchemy import Enum from sqlalchemy import Float from sqlalchemy import ForeignKey @@ -813,6 +814,13 @@ class IndexAttempt(Base): "connector_credential_pair_id", "time_created", ), + Index( + "ix_index_attempt_ccpair_search_settings_time_updated", + "connector_credential_pair_id", + "search_settings_id", + desc("time_updated"), + unique=False, + ), ) def __repr__(self) -> str: From d40fd82803572ed8305d33df8448121c4a6783f3 Mon Sep 17 00:00:00 2001 From: hagen-danswer Date: Thu, 9 Jan 2025 12:56:56 -0800 Subject: [PATCH 02/29] Conf doc sync improvements (#3643) * Reduce number of requests to Confluence * undo * added a way to dynamically adjust the pagination limit * undo --- .../confluence/doc_sync.py | 7 ++++ .../connectors/confluence/onyx_confluence.py | 40 ++++++++++++++----- 2 files changed, 36 insertions(+), 11 deletions(-) diff --git a/backend/ee/onyx/external_permissions/confluence/doc_sync.py b/backend/ee/onyx/external_permissions/confluence/doc_sync.py index 708be895dd0..bd78a8eade4 100644 --- a/backend/ee/onyx/external_permissions/confluence/doc_sync.py +++ b/backend/ee/onyx/external_permissions/confluence/doc_sync.py @@ -67,6 +67,13 @@ def _get_server_space_permissions( else: logger.warning(f"Email for user {user_name} not found in Confluence") + if not user_emails and not group_names: + logger.warning( + "No user emails or group names found in Confluence space permissions" + f"\nSpace key: {space_key}" + f"\nSpace permissions: {space_permissions}" + ) + return ExternalAccess( external_user_emails=user_emails, external_user_group_ids=group_names, diff --git a/backend/onyx/connectors/confluence/onyx_confluence.py b/backend/onyx/connectors/confluence/onyx_confluence.py index ea8a7a67e74..d95fa19630e 100644 --- a/backend/onyx/connectors/confluence/onyx_confluence.py +++ b/backend/onyx/connectors/confluence/onyx_confluence.py @@ -121,6 +121,7 @@ def wrapped_call(*args: list[Any], **kwargs: Any) -> Any: _DEFAULT_PAGINATION_LIMIT = 1000 +_MINIMUM_PAGINATION_LIMIT = 50 class OnyxConfluence(Confluence): @@ -204,24 +205,41 @@ def _paginate_url( # If the problematic expansion is in the url, replace it # with the replacement expansion and try again # If that fails, raise the error - if _PROBLEMATIC_EXPANSIONS not in url_suffix: - logger.exception( + if _PROBLEMATIC_EXPANSIONS in url_suffix: + logger.warning( + f"Replacing {_PROBLEMATIC_EXPANSIONS} with {_REPLACEMENT_EXPANSIONS}" + " and trying again." + ) + url_suffix = url_suffix.replace( + _PROBLEMATIC_EXPANSIONS, + _REPLACEMENT_EXPANSIONS, + ) + continue + if ( + raw_response.status_code == 500 + and limit > _MINIMUM_PAGINATION_LIMIT + ): + new_limit = limit // 2 + logger.warning( f"Error in confluence call to {url_suffix} \n" f"Raw Response Text: {raw_response.text} \n" f"Full Response: {raw_response.__dict__} \n" f"Error: {e} \n" + f"Reducing limit from {limit} to {new_limit} and trying again." ) - raise e + url_suffix = url_suffix.replace( + f"limit={limit}", f"limit={new_limit}" + ) + limit = new_limit + continue - logger.warning( - f"Replacing {_PROBLEMATIC_EXPANSIONS} with {_REPLACEMENT_EXPANSIONS}" - " and trying again." - ) - url_suffix = url_suffix.replace( - _PROBLEMATIC_EXPANSIONS, - _REPLACEMENT_EXPANSIONS, + logger.exception( + f"Error in confluence call to {url_suffix} \n" + f"Raw Response Text: {raw_response.text} \n" + f"Full Response: {raw_response.__dict__} \n" + f"Error: {e} \n" ) - continue + raise e try: next_response = raw_response.json() From 2ae91f0f2ba25f4f5a86ed848a7c99e404480361 Mon Sep 17 00:00:00 2001 From: rkuo-danswer Date: Thu, 9 Jan 2025 13:34:07 -0800 Subject: [PATCH 03/29] Feature/redis prod tool (#3619) * prototype tools for handling prod issues * add some commands * add batching and dry run options * custom redis tool * comment * default to app config settings for redis --------- Co-authored-by: Richard Kuo (Danswer) --- backend/scripts/celery_purge_queue.py | 87 +++++++++++ backend/scripts/onyx_redis.py | 198 ++++++++++++++++++++++++++ 2 files changed, 285 insertions(+) create mode 100644 backend/scripts/celery_purge_queue.py create mode 100644 backend/scripts/onyx_redis.py diff --git a/backend/scripts/celery_purge_queue.py b/backend/scripts/celery_purge_queue.py new file mode 100644 index 00000000000..cbaed2de4fe --- /dev/null +++ b/backend/scripts/celery_purge_queue.py @@ -0,0 +1,87 @@ +# Tool to run operations on Celery/Redis in production +# this is a work in progress and isn't completely put together yet +# but can serve as a stub for future operations +import argparse +import logging +from logging import getLogger + +from redis import Redis + +from onyx.background.celery.celery_redis import celery_get_queue_length +from onyx.configs.app_configs import REDIS_DB_NUMBER_CELERY +from onyx.redis.redis_pool import RedisPool + +# Configure the logger +logging.basicConfig( + level=logging.INFO, # Set the log level (DEBUG, INFO, WARNING, ERROR, CRITICAL) + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", # Log format + handlers=[logging.StreamHandler()], # Output logs to console +) + +logger = getLogger(__name__) + +REDIS_PASSWORD = "" + + +def celery_purge_queue(queue: str, tenant_id: str) -> None: + """Purging a celery queue is extremely difficult because the queue is a list + and the only way an item can be removed from a list is by VALUE, which is + a linear scan. Therefore, to purge the list of many values is roughly + n^2. + + The other alternative is to pop values and push them back, but that raises + questions about behavior while operating on a live queue. + """ + + pool = RedisPool.create_pool( + host="127.0.0.1", + port=6380, + db=REDIS_DB_NUMBER_CELERY, + password=REDIS_PASSWORD, + ssl=True, + ssl_cert_reqs="optional", + ssl_ca_certs=None, + ) + + r = Redis(connection_pool=pool) + + length = celery_get_queue_length(queue, r) + + logger.info(f"queue={queue} length={length}") + + # processed = 0 + # deleted = 0 + # for i in range(len(OnyxCeleryPriority)): + # queue_name = queue + # if i > 0: + # queue_name += CELERY_SEPARATOR + # queue_name += str(i) + + # length = r.llen(queue_name) + # for i in range(length): + # task_raw: bytes | None = r.lindex(queue_name, i) + # if not task_raw: + # break + + # processed += 1 + # task_str = task_raw.decode("utf-8") + # task = json.loads(task_str) + # task_kwargs_str = task["headers"]["kwargsrepr"] + # task_kwargs = json.loads(task_kwargs_str) + # task_tenant_id = task_kwargs["tenant_id"] + # if task_tenant_id and task_tenant_id == "tenant_id": + # print("Delete tenant_id={tenant_id}") + # if + # deleted += 1 + + # logger.info(f"processed={processed} deleted={deleted}") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Purge celery queue by tenant id") + parser.add_argument("--queue", type=str, help="Queue to purge", required=True) + + parser.add_argument("--tenant", type=str, help="Tenant ID to purge", required=True) + + args = parser.parse_args() + celery_purge_queue(queue=args.queue, tenant_id=args.tenant) diff --git a/backend/scripts/onyx_redis.py b/backend/scripts/onyx_redis.py new file mode 100644 index 00000000000..c7eb7fbef5c --- /dev/null +++ b/backend/scripts/onyx_redis.py @@ -0,0 +1,198 @@ +# Tool to run helpful operations on Redis in production +# This is targeted for internal usage and may not have all the necessary parameters +# for general usage across custom deployments +import argparse +import logging +import sys +import time +from logging import getLogger +from typing import cast + +from redis import Redis + +from onyx.configs.app_configs import REDIS_DB_NUMBER +from onyx.configs.app_configs import REDIS_HOST +from onyx.configs.app_configs import REDIS_PASSWORD +from onyx.configs.app_configs import REDIS_PORT +from onyx.redis.redis_pool import RedisPool + +# Configure the logger +logging.basicConfig( + level=logging.INFO, # Set the log level (DEBUG, INFO, WARNING, ERROR, CRITICAL) + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", # Log format + handlers=[logging.StreamHandler()], # Output logs to console +) + +logger = getLogger(__name__) + +SCAN_ITER_COUNT = 10000 +BATCH_DEFAULT = 1000 + + +def onyx_redis( + command: str, + batch: int, + dry_run: bool, + host: str, + port: int, + db: int, + password: str | None, +) -> int: + pool = RedisPool.create_pool( + host=host, + port=port, + db=db, + password=password if password else "", + ssl=True, + ssl_cert_reqs="optional", + ssl_ca_certs=None, + ) + + r = Redis(connection_pool=pool) + + try: + r.ping() + except: + logger.exception("Redis ping exceptioned") + raise + + if command == "purge_connectorsync_taskset": + """Purge connector tasksets. Used when the tasks represented in the tasksets + have been purged.""" + return purge_by_match_and_type( + "*connectorsync_taskset*", "set", batch, dry_run, r + ) + elif command == "purge_documentset_taskset": + return purge_by_match_and_type( + "*documentset_taskset*", "set", batch, dry_run, r + ) + elif command == "purge_usergroup_taskset": + return purge_by_match_and_type("*usergroup_taskset*", "set", batch, dry_run, r) + elif command == "purge_vespa_syncing": + return purge_by_match_and_type( + "*connectorsync:vespa_syncing*", "string", batch, dry_run, r + ) + else: + pass + + return 255 + + +def flush_batch_delete(batch_keys: list[bytes], r: Redis) -> None: + logger.info(f"Flushing {len(batch_keys)} operations to Redis.") + with r.pipeline() as pipe: + for batch_key in batch_keys: + pipe.delete(batch_key) + pipe.execute() + + +def purge_by_match_and_type( + match_pattern: str, match_type: str, batch_size: int, dry_run: bool, r: Redis +) -> int: + """match_pattern: glob style expression + match_type: https://redis.io/docs/latest/commands/type/ + """ + + # cursor = "0" + # while cursor != 0: + # cursor, data = self.scan( + # cursor=cursor, match=match, count=count, _type=_type, **kwargs + # ) + + start = time.monotonic() + + count = 0 + batch_keys: list[bytes] = [] + for key in r.scan_iter(match_pattern, count=SCAN_ITER_COUNT, _type=match_type): + # key_type = r.type(key) + # if key_type != match_type.encode("utf-8"): + # continue + + key = cast(bytes, key) + key_str = key.decode("utf-8") + + count += 1 + if dry_run: + logger.info(f"(DRY-RUN) Deleting item {count}: {key_str}") + continue + + logger.info(f"Deleting item {count}: {key_str}") + + batch_keys.append(key) + if len(batch_keys) >= batch_size: + flush_batch_delete(batch_keys, r) + batch_keys.clear() + + if len(batch_keys) >= batch_size: + flush_batch_delete(batch_keys, r) + batch_keys.clear() + + logger.info(f"Deleted {count} matches.") + + elapsed = time.monotonic() - start + logger.info(f"Time elapsed: {elapsed:.2f}s") + return 0 + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Onyx Redis Manager") + parser.add_argument("--command", type=str, help="Operation to run", required=True) + + parser.add_argument( + "--host", + type=str, + default=REDIS_HOST, + help="The redis host", + required=False, + ) + + parser.add_argument( + "--port", + type=int, + default=REDIS_PORT, + help="The redis port", + required=False, + ) + + parser.add_argument( + "--db", + type=int, + default=REDIS_DB_NUMBER, + help="The redis db", + required=False, + ) + + parser.add_argument( + "--password", + type=str, + default=REDIS_PASSWORD, + help="The redis password", + required=False, + ) + + parser.add_argument( + "--batch", + type=int, + default=BATCH_DEFAULT, + help="Size of operation batches to send to Redis", + required=False, + ) + + parser.add_argument( + "--dry-run", + action="store_true", + help="Perform a dry run without actually executing modifications", + required=False, + ) + + args = parser.parse_args() + exitcode = onyx_redis( + command=args.command, + batch=args.batch, + dry_run=args.dry_run, + host=args.host, + port=args.port, + db=args.db, + password=args.password, + ) + sys.exit(exitcode) From 91e32e801de127998469cdbf4cce43bc5e724a59 Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Thu, 9 Jan 2025 13:51:58 -0800 Subject: [PATCH 04/29] hope this env var works. --- .../docker-build-push-model-server-container-on-tag.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/docker-build-push-model-server-container-on-tag.yml b/.github/workflows/docker-build-push-model-server-container-on-tag.yml index 7df47c416ce..7e47fcbe7b0 100644 --- a/.github/workflows/docker-build-push-model-server-container-on-tag.yml +++ b/.github/workflows/docker-build-push-model-server-container-on-tag.yml @@ -118,6 +118,6 @@ jobs: TRIVY_DB_REPOSITORY: "public.ecr.aws/aquasecurity/trivy-db:2" TRIVY_JAVA_DB_REPOSITORY: "public.ecr.aws/aquasecurity/trivy-java-db:1" with: - image-ref: docker.io/onyxdotapp/onyx-model-server:${{ github.ref_name }} + image-ref: docker.io/${{ env.REGISTRY_IMAGE }}:${{ github.ref_name }} severity: "CRITICAL,HIGH" timeout: "10m" From c55de284238b5a374d98b26030896b01ce9f0d73 Mon Sep 17 00:00:00 2001 From: hagen-danswer Date: Thu, 9 Jan 2025 14:15:38 -0800 Subject: [PATCH 05/29] added distinct when outer joining for user filters (#3641) * added distinct when outer joining for user filters * Added distinct when outer joining for user filters for all --- backend/ee/onyx/db/token_limit.py | 1 + backend/onyx/db/connector_credential_pair.py | 1 + backend/onyx/db/credentials.py | 1 + backend/onyx/db/document_set.py | 1 + backend/onyx/db/feedback.py | 1 + backend/onyx/db/persona.py | 1 + 6 files changed, 6 insertions(+) diff --git a/backend/ee/onyx/db/token_limit.py b/backend/ee/onyx/db/token_limit.py index 47d78e1fd26..863f4450315 100644 --- a/backend/ee/onyx/db/token_limit.py +++ b/backend/ee/onyx/db/token_limit.py @@ -24,6 +24,7 @@ def _add_user_filters( if user is None or user.role == UserRole.ADMIN: return stmt + stmt = stmt.distinct() TRLimit_UG = aliased(TokenRateLimit__UserGroup) User__UG = aliased(User__UserGroup) diff --git a/backend/onyx/db/connector_credential_pair.py b/backend/onyx/db/connector_credential_pair.py index 3c796492b11..ea72f1a9507 100644 --- a/backend/onyx/db/connector_credential_pair.py +++ b/backend/onyx/db/connector_credential_pair.py @@ -39,6 +39,7 @@ def _add_user_filters( if user is None or user.role == UserRole.ADMIN: return stmt + stmt = stmt.distinct() UG__CCpair = aliased(UserGroup__ConnectorCredentialPair) User__UG = aliased(User__UserGroup) diff --git a/backend/onyx/db/credentials.py b/backend/onyx/db/credentials.py index 5c135137fbd..86cb31aa811 100644 --- a/backend/onyx/db/credentials.py +++ b/backend/onyx/db/credentials.py @@ -74,6 +74,7 @@ def _add_user_filters( # Basic users can only access credentials that are owned by them return stmt.where(Credential.user_id == user.id) + stmt = stmt.distinct() """ THIS PART IS FOR CURATORS AND GLOBAL CURATORS Here we select cc_pairs by relation: diff --git a/backend/onyx/db/document_set.py b/backend/onyx/db/document_set.py index b5f0dd365aa..750021d29a1 100644 --- a/backend/onyx/db/document_set.py +++ b/backend/onyx/db/document_set.py @@ -40,6 +40,7 @@ def _add_user_filters( if user is None or user.role == UserRole.ADMIN: return stmt + stmt = stmt.distinct() DocumentSet__UG = aliased(DocumentSet__UserGroup) User__UG = aliased(User__UserGroup) """ diff --git a/backend/onyx/db/feedback.py b/backend/onyx/db/feedback.py index f01d8151228..7acf44fd7e4 100644 --- a/backend/onyx/db/feedback.py +++ b/backend/onyx/db/feedback.py @@ -50,6 +50,7 @@ def _add_user_filters( if user is None or user.role == UserRole.ADMIN: return stmt + stmt = stmt.distinct() DocByCC = aliased(DocumentByConnectorCredentialPair) CCPair = aliased(ConnectorCredentialPair) UG__CCpair = aliased(UserGroup__ConnectorCredentialPair) diff --git a/backend/onyx/db/persona.py b/backend/onyx/db/persona.py index d092a4c3275..ec896c5d304 100644 --- a/backend/onyx/db/persona.py +++ b/backend/onyx/db/persona.py @@ -49,6 +49,7 @@ def _add_user_filters( if user is None or user.role == UserRole.ADMIN: return stmt + stmt = stmt.distinct() Persona__UG = aliased(Persona__UserGroup) User__UG = aliased(User__UserGroup) """ From 50131ba22cc369c4ff373ce83a106a1a24b64b30 Mon Sep 17 00:00:00 2001 From: hagen-danswer Date: Thu, 9 Jan 2025 15:13:02 -0800 Subject: [PATCH 06/29] Better logging for confluence space permissions --- .../confluence/doc_sync.py | 4 ++- .../connectors/confluence/onyx_confluence.py | 27 +++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/backend/ee/onyx/external_permissions/confluence/doc_sync.py b/backend/ee/onyx/external_permissions/confluence/doc_sync.py index bd78a8eade4..9805cdad6ee 100644 --- a/backend/ee/onyx/external_permissions/confluence/doc_sync.py +++ b/backend/ee/onyx/external_permissions/confluence/doc_sync.py @@ -24,7 +24,9 @@ def _get_server_space_permissions( confluence_client: OnyxConfluence, space_key: str ) -> ExternalAccess: - space_permissions = confluence_client.get_space_permissions(space_key=space_key) + space_permissions = confluence_client.get_all_space_permissions_server( + space_key=space_key + ) viewspace_permissions = [] for permission_category in space_permissions: diff --git a/backend/onyx/connectors/confluence/onyx_confluence.py b/backend/onyx/connectors/confluence/onyx_confluence.py index d95fa19630e..e6a2b957ee7 100644 --- a/backend/onyx/connectors/confluence/onyx_confluence.py +++ b/backend/onyx/connectors/confluence/onyx_confluence.py @@ -354,6 +354,33 @@ def paginated_group_members_retrieval( group_name = quote(group_name) yield from self._paginate_url(f"rest/api/group/{group_name}/member", limit) + def get_all_space_permissions_server( + self, + space_key: str, + ) -> list[dict[str, Any]]: + """ + This is a confluence server specific method that can be used to + fetch the permissions of a space. + This is better logging than calling the get_space_permissions method + because it returns a jsonrpc response. + """ + url = "rpc/json-rpc/confluenceservice-v2" + data = { + "jsonrpc": "2.0", + "method": "getSpacePermissionSets", + "id": 7, + "params": [space_key], + } + response = self.post(url, data=data) + logger.debug(f"jsonrpc response: {response}") + if not response.get("result"): + logger.warning( + f"No jsonrpc response for space permissions for space {space_key}" + f"\nResponse: {response}" + ) + + return response.get("result", []) + def _validate_connector_configuration( credentials: dict[str, Any], From 962240031fc2c0616a69f5efcb43c9fe8cd6ad00 Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Thu, 9 Jan 2025 16:29:37 -0800 Subject: [PATCH 07/29] figuring out why multiprocessing set_start_method isn't working. --- backend/onyx/background/celery/apps/app_base.py | 6 ++++-- backend/onyx/background/celery/apps/heavy.py | 1 + backend/onyx/background/celery/apps/indexing.py | 1 + backend/onyx/background/celery/apps/light.py | 2 ++ backend/onyx/background/celery/apps/primary.py | 1 + backend/onyx/background/celery/tasks/indexing/tasks.py | 8 ++++++-- 6 files changed, 15 insertions(+), 4 deletions(-) diff --git a/backend/onyx/background/celery/apps/app_base.py b/backend/onyx/background/celery/apps/app_base.py index 22529a66c2b..5e767dfbefc 100644 --- a/backend/onyx/background/celery/apps/app_base.py +++ b/backend/onyx/background/celery/apps/app_base.py @@ -1,5 +1,4 @@ import logging -import multiprocessing import time from typing import Any @@ -163,7 +162,10 @@ def on_task_postrun( def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None: """The first signal sent on celery worker startup""" - multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn + # rkuo: commenting out as set_start_method seems to work here on macOS + # but not in the cloud and it is unclear why. + # logger.info(f"Multiprocessing start method - setting to spawn.") + # multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn def wait_for_redis(sender: Any, **kwargs: Any) -> None: diff --git a/backend/onyx/background/celery/apps/heavy.py b/backend/onyx/background/celery/apps/heavy.py index f45e6df9aa4..ee8958e7dd0 100644 --- a/backend/onyx/background/celery/apps/heavy.py +++ b/backend/onyx/background/celery/apps/heavy.py @@ -56,6 +56,7 @@ def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None @worker_init.connect def on_worker_init(sender: Any, **kwargs: Any) -> None: logger.info("worker_init signal received.") + multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn logger.info(f"Multiprocessing start method: {multiprocessing.get_start_method()}") SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_HEAVY_APP_NAME) diff --git a/backend/onyx/background/celery/apps/indexing.py b/backend/onyx/background/celery/apps/indexing.py index 9262b632dc2..46282772ff4 100644 --- a/backend/onyx/background/celery/apps/indexing.py +++ b/backend/onyx/background/celery/apps/indexing.py @@ -57,6 +57,7 @@ def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None @worker_init.connect def on_worker_init(sender: Any, **kwargs: Any) -> None: logger.info("worker_init signal received.") + multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn logger.info(f"Multiprocessing start method: {multiprocessing.get_start_method()}") SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_INDEXING_APP_NAME) diff --git a/backend/onyx/background/celery/apps/light.py b/backend/onyx/background/celery/apps/light.py index e6567b14770..11f1341a1e0 100644 --- a/backend/onyx/background/celery/apps/light.py +++ b/backend/onyx/background/celery/apps/light.py @@ -56,7 +56,9 @@ def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None @worker_init.connect def on_worker_init(sender: Any, **kwargs: Any) -> None: logger.info("worker_init signal received.") + multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn logger.info(f"Multiprocessing start method: {multiprocessing.get_start_method()}") + logger.info(f"Concurrency: {sender.concurrency}") SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_LIGHT_APP_NAME) SqlEngine.init_engine(pool_size=sender.concurrency, max_overflow=8) diff --git a/backend/onyx/background/celery/apps/primary.py b/backend/onyx/background/celery/apps/primary.py index caa697f8837..af2105b8c6d 100644 --- a/backend/onyx/background/celery/apps/primary.py +++ b/backend/onyx/background/celery/apps/primary.py @@ -80,6 +80,7 @@ def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None @worker_init.connect def on_worker_init(sender: Any, **kwargs: Any) -> None: logger.info("worker_init signal received.") + multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn logger.info(f"Multiprocessing start method: {multiprocessing.get_start_method()}") SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_PRIMARY_APP_NAME) diff --git a/backend/onyx/background/celery/tasks/indexing/tasks.py b/backend/onyx/background/celery/tasks/indexing/tasks.py index 9fd73972d0e..b29dd1e8a08 100644 --- a/backend/onyx/background/celery/tasks/indexing/tasks.py +++ b/backend/onyx/background/celery/tasks/indexing/tasks.py @@ -1,3 +1,4 @@ +import multiprocessing import os import sys import time @@ -853,11 +854,14 @@ def connector_indexing_proxy_task( search_settings_id: int, tenant_id: str | None, ) -> None: - """celery tasks are forked, but forking is unstable. This proxies work to a spawned task.""" + """celery tasks are forked, but forking is unstable. + This is a thread that proxies work to a spawned task.""" + task_logger.info( f"Indexing watchdog - starting: attempt={index_attempt_id} " f"cc_pair={cc_pair_id} " - f"search_settings={search_settings_id}" + f"search_settings={search_settings_id} " + f"multiprocessing={multiprocessing.get_start_method()}" ) if not self.request.id: From d972a78f45a696999abf727f9b38c49fada5ea47 Mon Sep 17 00:00:00 2001 From: rkuo-danswer Date: Thu, 9 Jan 2025 17:39:45 -0800 Subject: [PATCH 08/29] Make connector pause and delete fast (#3646) * first cut * refresh on delete --------- Co-authored-by: Richard Kuo (Danswer) --- .../onyx/background/celery/celery_utils.py | 24 ++-- backend/onyx/server/documents/cc_pair.py | 107 +++++------------- .../connector/[ccPairId]/DeletionButton.tsx | 32 ++++-- .../app/admin/connector/[ccPairId]/page.tsx | 2 +- 4 files changed, 69 insertions(+), 96 deletions(-) diff --git a/backend/onyx/background/celery/celery_utils.py b/backend/onyx/background/celery/celery_utils.py index fc6fef1fab5..394dff35258 100644 --- a/backend/onyx/background/celery/celery_utils.py +++ b/backend/onyx/background/celery/celery_utils.py @@ -14,6 +14,7 @@ from onyx.connectors.interfaces import SlimConnector from onyx.connectors.models import Document from onyx.db.connector_credential_pair import get_connector_credential_pair +from onyx.db.enums import ConnectorCredentialPairStatus from onyx.db.enums import TaskStatus from onyx.db.models import TaskQueueState from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface @@ -41,14 +42,21 @@ def _get_deletion_status( return None redis_connector = RedisConnector(tenant_id, cc_pair.id) - if not redis_connector.delete.fenced: - return None - - return TaskQueueState( - task_id="", - task_name=redis_connector.delete.fence_key, - status=TaskStatus.STARTED, - ) + if redis_connector.delete.fenced: + return TaskQueueState( + task_id="", + task_name=redis_connector.delete.fence_key, + status=TaskStatus.STARTED, + ) + + if cc_pair.status == ConnectorCredentialPairStatus.DELETING: + return TaskQueueState( + task_id="", + task_name=redis_connector.delete.fence_key, + status=TaskStatus.PENDING, + ) + + return None def get_deletion_attempt_snapshot( diff --git a/backend/onyx/server/documents/cc_pair.py b/backend/onyx/server/documents/cc_pair.py index cf87469535e..64086de5df0 100644 --- a/backend/onyx/server/documents/cc_pair.py +++ b/backend/onyx/server/documents/cc_pair.py @@ -164,17 +164,12 @@ def update_cc_pair_status( db_session: Session = Depends(get_session), tenant_id: str | None = Depends(get_current_tenant_id), ) -> JSONResponse: - """This method may wait up to 30 seconds if pausing the connector due to the need to - terminate tasks in progress. Tasks are not guaranteed to terminate within the - timeout. + """This method returns nearly immediately. It simply sets some signals and + optimistically assumes any running background processes will clean themselves up. + This is done to improve the perceived end user experience. Returns HTTPStatus.OK if everything finished. - Returns HTTPStatus.ACCEPTED if the connector is being paused, but background tasks - did not finish within the timeout. """ - WAIT_TIMEOUT = 15.0 - still_terminating = False - cc_pair = get_connector_credential_pair_from_id( cc_pair_id=cc_pair_id, db_session=db_session, @@ -188,73 +183,37 @@ def update_cc_pair_status( detail="Connection not found for current user's permissions", ) + redis_connector = RedisConnector(tenant_id, cc_pair_id) if status_update_request.status == ConnectorCredentialPairStatus.PAUSED: + redis_connector.stop.set_fence(True) + search_settings_list: list[SearchSettings] = get_active_search_settings( db_session ) - redis_connector = RedisConnector(tenant_id, cc_pair_id) - - try: - redis_connector.stop.set_fence(True) - while True: - logger.debug( - f"Wait for indexing soft termination starting: cc_pair={cc_pair_id}" - ) - wait_succeeded = redis_connector.wait_for_indexing_termination( - search_settings_list, WAIT_TIMEOUT - ) - if wait_succeeded: - logger.debug( - f"Wait for indexing soft termination succeeded: cc_pair={cc_pair_id}" - ) - break - - logger.debug( - "Wait for indexing soft termination timed out. " - f"Moving to hard termination: cc_pair={cc_pair_id} timeout={WAIT_TIMEOUT:.2f}" - ) - - for search_settings in search_settings_list: - redis_connector_index = redis_connector.new_index( - search_settings.id - ) - if not redis_connector_index.fenced: - continue - - index_payload = redis_connector_index.payload - if not index_payload: - continue - - if not index_payload.celery_task_id: - continue - - # Revoke the task to prevent it from running - primary_app.control.revoke(index_payload.celery_task_id) - - # If it is running, then signaling for termination will get the - # watchdog thread to kill the spawned task - redis_connector_index.set_terminate(index_payload.celery_task_id) - - logger.debug( - f"Wait for indexing hard termination starting: cc_pair={cc_pair_id}" - ) - wait_succeeded = redis_connector.wait_for_indexing_termination( - search_settings_list, WAIT_TIMEOUT - ) - if wait_succeeded: - logger.debug( - f"Wait for indexing hard termination succeeded: cc_pair={cc_pair_id}" - ) - break - - logger.debug( - f"Wait for indexing hard termination timed out: cc_pair={cc_pair_id}" - ) - still_terminating = True - break - finally: - redis_connector.stop.set_fence(False) + while True: + for search_settings in search_settings_list: + redis_connector_index = redis_connector.new_index(search_settings.id) + if not redis_connector_index.fenced: + continue + + index_payload = redis_connector_index.payload + if not index_payload: + continue + + if not index_payload.celery_task_id: + continue + + # Revoke the task to prevent it from running + primary_app.control.revoke(index_payload.celery_task_id) + + # If it is running, then signaling for termination will get the + # watchdog thread to kill the spawned task + redis_connector_index.set_terminate(index_payload.celery_task_id) + + break + else: + redis_connector.stop.set_fence(False) update_connector_credential_pair_from_id( db_session=db_session, @@ -264,14 +223,6 @@ def update_cc_pair_status( db_session.commit() - if still_terminating: - return JSONResponse( - status_code=HTTPStatus.ACCEPTED, - content={ - "message": "Request accepted, background task termination still in progress" - }, - ) - return JSONResponse( status_code=HTTPStatus.OK, content={"message": str(HTTPStatus.OK)} ) diff --git a/web/src/app/admin/connector/[ccPairId]/DeletionButton.tsx b/web/src/app/admin/connector/[ccPairId]/DeletionButton.tsx index ccef14b5a35..fe430af33fc 100644 --- a/web/src/app/admin/connector/[ccPairId]/DeletionButton.tsx +++ b/web/src/app/admin/connector/[ccPairId]/DeletionButton.tsx @@ -8,7 +8,13 @@ import { deleteCCPair } from "@/lib/documentDeletion"; import { mutate } from "swr"; import { buildCCPairInfoUrl } from "./lib"; -export function DeletionButton({ ccPair }: { ccPair: CCPairFullInfo }) { +export function DeletionButton({ + ccPair, + refresh, +}: { + ccPair: CCPairFullInfo; + refresh: () => void; +}) { const { popup, setPopup } = usePopup(); const isDeleting = @@ -31,14 +37,22 @@ export function DeletionButton({ ccPair }: { ccPair: CCPairFullInfo }) { {popup}