diff --git a/README.md b/README.md index 7188faed2fd..86b31be58ab 100644 --- a/README.md +++ b/README.md @@ -133,3 +133,4 @@ Looking to contribute? Please check out the [Contribution Guide](CONTRIBUTING.md ## ⭐Star History [![Star History Chart](https://api.star-history.com/svg?repos=onyx-dot-app/onyx&type=Date)](https://star-history.com/#onyx-dot-app/onyx&Date) + diff --git a/backend/onyx/background/celery/tasks/beat_schedule.py b/backend/onyx/background/celery/tasks/beat_schedule.py index 6db15cd832e..0309611acd7 100644 --- a/backend/onyx/background/celery/tasks/beat_schedule.py +++ b/backend/onyx/background/celery/tasks/beat_schedule.py @@ -20,236 +20,151 @@ # we have a better implementation (backpressure, etc) CLOUD_BEAT_SCHEDULE_MULTIPLIER = 4 -# tasks that only run in the cloud -# the name attribute must start with ONYX_CLOUD_CELERY_TASK_PREFIX = "cloud" to be filtered -# by the DynamicTenantScheduler -cloud_tasks_to_schedule = [ - # cloud specific tasks - { - "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-alembic", - "task": OnyxCeleryTask.CLOUD_CHECK_ALEMBIC, - "schedule": timedelta(hours=1 * CLOUD_BEAT_SCHEDULE_MULTIPLIER), - "options": { - "queue": OnyxCeleryQueues.MONITORING, - "priority": OnyxCeleryPriority.HIGH, - "expires": BEAT_EXPIRES_DEFAULT, - }, - }, - # remaining tasks are cloud generators for per tenant tasks - { - "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-for-indexing", - "task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR, - "schedule": timedelta(seconds=15 * CLOUD_BEAT_SCHEDULE_MULTIPLIER), - "options": { - "priority": OnyxCeleryPriority.HIGHEST, - "expires": BEAT_EXPIRES_DEFAULT, - }, - "kwargs": { - "task_name": OnyxCeleryTask.CHECK_FOR_INDEXING, - }, - }, - { - "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-for-connector-deletion", - "task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR, - "schedule": timedelta(seconds=20 * CLOUD_BEAT_SCHEDULE_MULTIPLIER), - "options": { - "priority": OnyxCeleryPriority.HIGHEST, - "expires": BEAT_EXPIRES_DEFAULT, - }, - "kwargs": { - "task_name": OnyxCeleryTask.CHECK_FOR_CONNECTOR_DELETION, - }, - }, - { - "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-for-vespa-sync", - "task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR, - "schedule": timedelta(seconds=20 * CLOUD_BEAT_SCHEDULE_MULTIPLIER), - "options": { - "priority": OnyxCeleryPriority.HIGHEST, - "expires": BEAT_EXPIRES_DEFAULT, - }, - "kwargs": { - "task_name": OnyxCeleryTask.CHECK_FOR_VESPA_SYNC_TASK, - }, - }, - { - "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-for-prune", - "task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR, - "schedule": timedelta(seconds=15 * CLOUD_BEAT_SCHEDULE_MULTIPLIER), - "options": { - "priority": OnyxCeleryPriority.HIGHEST, - "expires": BEAT_EXPIRES_DEFAULT, - }, - "kwargs": { - "task_name": OnyxCeleryTask.CHECK_FOR_PRUNING, - }, - }, - { - "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_monitor-vespa-sync", - "task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR, - "schedule": timedelta(seconds=15 * CLOUD_BEAT_SCHEDULE_MULTIPLIER), - "options": { - "priority": OnyxCeleryPriority.HIGHEST, - "expires": BEAT_EXPIRES_DEFAULT, +# tasks that run in either self-hosted on cloud +beat_task_templates: list[dict] = [] + +beat_task_templates.extend( + [ + { + "name": "check-for-indexing", + "task": OnyxCeleryTask.CHECK_FOR_INDEXING, + "schedule": timedelta(seconds=15), + "options": { + "priority": OnyxCeleryPriority.MEDIUM, + "expires": BEAT_EXPIRES_DEFAULT, + }, }, - "kwargs": { - "task_name": OnyxCeleryTask.MONITOR_VESPA_SYNC, + { + "name": "check-for-connector-deletion", + "task": OnyxCeleryTask.CHECK_FOR_CONNECTOR_DELETION, + "schedule": timedelta(seconds=20), + "options": { + "priority": OnyxCeleryPriority.MEDIUM, + "expires": BEAT_EXPIRES_DEFAULT, + }, }, - }, - { - "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-for-doc-permissions-sync", - "task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR, - "schedule": timedelta(seconds=30 * CLOUD_BEAT_SCHEDULE_MULTIPLIER), - "options": { - "priority": OnyxCeleryPriority.HIGHEST, - "expires": BEAT_EXPIRES_DEFAULT, + { + "name": "check-for-vespa-sync", + "task": OnyxCeleryTask.CHECK_FOR_VESPA_SYNC_TASK, + "schedule": timedelta(seconds=20), + "options": { + "priority": OnyxCeleryPriority.MEDIUM, + "expires": BEAT_EXPIRES_DEFAULT, + }, }, - "kwargs": { - "task_name": OnyxCeleryTask.CHECK_FOR_DOC_PERMISSIONS_SYNC, + { + "name": "check-for-pruning", + "task": OnyxCeleryTask.CHECK_FOR_PRUNING, + "schedule": timedelta(hours=1), + "options": { + "priority": OnyxCeleryPriority.MEDIUM, + "expires": BEAT_EXPIRES_DEFAULT, + }, }, - }, - { - "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-for-external-group-sync", - "task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR, - "schedule": timedelta(seconds=20 * CLOUD_BEAT_SCHEDULE_MULTIPLIER), - "options": { - "priority": OnyxCeleryPriority.HIGHEST, - "expires": BEAT_EXPIRES_DEFAULT, + { + "name": "monitor-vespa-sync", + "task": OnyxCeleryTask.MONITOR_VESPA_SYNC, + "schedule": timedelta(seconds=5), + "options": { + "priority": OnyxCeleryPriority.MEDIUM, + "expires": BEAT_EXPIRES_DEFAULT, + }, }, - "kwargs": { - "task_name": OnyxCeleryTask.CHECK_FOR_EXTERNAL_GROUP_SYNC, + { + "name": "check-for-doc-permissions-sync", + "task": OnyxCeleryTask.CHECK_FOR_DOC_PERMISSIONS_SYNC, + "schedule": timedelta(seconds=30), + "options": { + "priority": OnyxCeleryPriority.MEDIUM, + "expires": BEAT_EXPIRES_DEFAULT, + }, }, - }, - { - "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_monitor-background-processes", - "task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR, - "schedule": timedelta(minutes=5 * CLOUD_BEAT_SCHEDULE_MULTIPLIER), - "options": { - "priority": OnyxCeleryPriority.HIGHEST, - "expires": BEAT_EXPIRES_DEFAULT, + { + "name": "check-for-external-group-sync", + "task": OnyxCeleryTask.CHECK_FOR_EXTERNAL_GROUP_SYNC, + "schedule": timedelta(seconds=20), + "options": { + "priority": OnyxCeleryPriority.MEDIUM, + "expires": BEAT_EXPIRES_DEFAULT, + }, }, - "kwargs": { - "task_name": OnyxCeleryTask.MONITOR_BACKGROUND_PROCESSES, - "queue": OnyxCeleryQueues.MONITORING, - "priority": OnyxCeleryPriority.LOW, + { + "name": "monitor-background-processes", + "task": OnyxCeleryTask.MONITOR_BACKGROUND_PROCESSES, + "schedule": timedelta(minutes=5), + "options": { + "priority": OnyxCeleryPriority.LOW, + "expires": BEAT_EXPIRES_DEFAULT, + "queue": OnyxCeleryQueues.MONITORING, + }, }, - }, -] + ] +) +# Only add the LLM model update task if the API URL is configured if LLM_MODEL_UPDATE_API_URL: - cloud_tasks_to_schedule.append( + beat_task_templates.append( { - "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-for-llm-model-update", - "task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR, - "schedule": timedelta( - hours=1 * CLOUD_BEAT_SCHEDULE_MULTIPLIER - ), # Check every hour + "name": "check-for-llm-model-update", + "task": OnyxCeleryTask.CHECK_FOR_LLM_MODEL_UPDATE, + "schedule": timedelta(hours=1), # Check every hour "options": { - "priority": OnyxCeleryPriority.HIGHEST, - "expires": BEAT_EXPIRES_DEFAULT, - }, - "kwargs": { - "task_name": OnyxCeleryTask.CHECK_FOR_LLM_MODEL_UPDATE, "priority": OnyxCeleryPriority.LOW, + "expires": BEAT_EXPIRES_DEFAULT, }, } ) -# tasks that run in either self-hosted on cloud -tasks_to_schedule: list[dict] = [] -if not MULTI_TENANT: - tasks_to_schedule.extend( - [ - { - "name": "check-for-indexing", - "task": OnyxCeleryTask.CHECK_FOR_INDEXING, - "schedule": timedelta(seconds=15), - "options": { - "priority": OnyxCeleryPriority.MEDIUM, - "expires": BEAT_EXPIRES_DEFAULT, - }, - }, - { - "name": "check-for-connector-deletion", - "task": OnyxCeleryTask.CHECK_FOR_CONNECTOR_DELETION, - "schedule": timedelta(seconds=20), - "options": { - "priority": OnyxCeleryPriority.MEDIUM, - "expires": BEAT_EXPIRES_DEFAULT, - }, - }, - { - "name": "check-for-vespa-sync", - "task": OnyxCeleryTask.CHECK_FOR_VESPA_SYNC_TASK, - "schedule": timedelta(seconds=20), - "options": { - "priority": OnyxCeleryPriority.MEDIUM, - "expires": BEAT_EXPIRES_DEFAULT, - }, - }, - { - "name": "check-for-pruning", - "task": OnyxCeleryTask.CHECK_FOR_PRUNING, - "schedule": timedelta(hours=1), - "options": { - "priority": OnyxCeleryPriority.MEDIUM, - "expires": BEAT_EXPIRES_DEFAULT, - }, - }, - { - "name": "monitor-vespa-sync", - "task": OnyxCeleryTask.MONITOR_VESPA_SYNC, - "schedule": timedelta(seconds=5), - "options": { - "priority": OnyxCeleryPriority.MEDIUM, - "expires": BEAT_EXPIRES_DEFAULT, - }, - }, - { - "name": "check-for-doc-permissions-sync", - "task": OnyxCeleryTask.CHECK_FOR_DOC_PERMISSIONS_SYNC, - "schedule": timedelta(seconds=30), - "options": { - "priority": OnyxCeleryPriority.MEDIUM, - "expires": BEAT_EXPIRES_DEFAULT, - }, - }, - { - "name": "check-for-external-group-sync", - "task": OnyxCeleryTask.CHECK_FOR_EXTERNAL_GROUP_SYNC, - "schedule": timedelta(seconds=20), - "options": { - "priority": OnyxCeleryPriority.MEDIUM, - "expires": BEAT_EXPIRES_DEFAULT, - }, - }, - { - "name": "monitor-background-processes", - "task": OnyxCeleryTask.MONITOR_BACKGROUND_PROCESSES, - "schedule": timedelta(minutes=15), - "options": { - "priority": OnyxCeleryPriority.LOW, - "expires": BEAT_EXPIRES_DEFAULT, - "queue": OnyxCeleryQueues.MONITORING, - }, - }, - ] - ) +def make_cloud_generator_task(task: dict[str, Any]) -> dict[str, Any]: + cloud_task: dict[str, Any] = {} + + # constant options for cloud beat task generators + task_schedule: timedelta = task["schedule"] + cloud_task["schedule"] = task_schedule * CLOUD_BEAT_SCHEDULE_MULTIPLIER + cloud_task["options"] = {} + cloud_task["options"]["priority"] = OnyxCeleryPriority.HIGHEST + cloud_task["options"]["expires"] = BEAT_EXPIRES_DEFAULT + + # settings dependent on the original task + cloud_task["name"] = f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_{task['name']}" + cloud_task["task"] = OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR + cloud_task["kwargs"] = {} + cloud_task["kwargs"]["task_name"] = task["task"] + + optional_fields = ["queue", "priority", "expires"] + for field in optional_fields: + if field in task["options"]: + cloud_task["kwargs"][field] = task["options"][field] + + return cloud_task + + +# tasks that only run in the cloud +# the name attribute must start with ONYX_CLOUD_CELERY_TASK_PREFIX = "cloud" to be filtered +# by the DynamicTenantScheduler +cloud_tasks_to_schedule: list[dict] = [ + # cloud specific tasks + { + "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-alembic", + "task": OnyxCeleryTask.CLOUD_CHECK_ALEMBIC, + "schedule": timedelta(hours=1), + "options": { + "queue": OnyxCeleryQueues.MONITORING, + "priority": OnyxCeleryPriority.HIGH, + "expires": BEAT_EXPIRES_DEFAULT, + }, + }, +] + +# generate our cloud and self-hosted beat tasks from the templates +for beat_task_template in beat_task_templates: + cloud_task = make_cloud_generator_task(beat_task_template) + cloud_tasks_to_schedule.append(cloud_task) - # Only add the LLM model update task if the API URL is configured - if LLM_MODEL_UPDATE_API_URL: - tasks_to_schedule.append( - { - "name": "check-for-llm-model-update", - "task": OnyxCeleryTask.CHECK_FOR_LLM_MODEL_UPDATE, - "schedule": timedelta(hours=1), # Check every hour - "options": { - "priority": OnyxCeleryPriority.LOW, - "expires": BEAT_EXPIRES_DEFAULT, - }, - } - ) +tasks_to_schedule: list[dict] = [] +if not MULTI_TENANT: + tasks_to_schedule = beat_task_templates def get_cloud_tasks_to_schedule() -> list[dict[str, Any]]: