Skip to content

Commit

Permalink
figuring out why multiprocessing set_start_method isn't working.
Browse files Browse the repository at this point in the history
  • Loading branch information
Richard Kuo (Danswer) committed Jan 10, 2025
1 parent 4392173 commit 9622400
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 4 deletions.
6 changes: 4 additions & 2 deletions backend/onyx/background/celery/apps/app_base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import logging
import multiprocessing
import time
from typing import Any

Expand Down Expand Up @@ -163,7 +162,10 @@ def on_task_postrun(

def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None:
"""The first signal sent on celery worker startup"""
multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn
# rkuo: commenting out as set_start_method seems to work here on macOS
# but not in the cloud and it is unclear why.
# logger.info(f"Multiprocessing start method - setting to spawn.")
# multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn


def wait_for_redis(sender: Any, **kwargs: Any) -> None:
Expand Down
1 change: 1 addition & 0 deletions backend/onyx/background/celery/apps/heavy.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None
@worker_init.connect
def on_worker_init(sender: Any, **kwargs: Any) -> None:
logger.info("worker_init signal received.")
multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn
logger.info(f"Multiprocessing start method: {multiprocessing.get_start_method()}")

SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_HEAVY_APP_NAME)
Expand Down
1 change: 1 addition & 0 deletions backend/onyx/background/celery/apps/indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None
@worker_init.connect
def on_worker_init(sender: Any, **kwargs: Any) -> None:
logger.info("worker_init signal received.")
multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn
logger.info(f"Multiprocessing start method: {multiprocessing.get_start_method()}")

SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_INDEXING_APP_NAME)
Expand Down
2 changes: 2 additions & 0 deletions backend/onyx/background/celery/apps/light.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None
@worker_init.connect
def on_worker_init(sender: Any, **kwargs: Any) -> None:
logger.info("worker_init signal received.")
multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn
logger.info(f"Multiprocessing start method: {multiprocessing.get_start_method()}")
logger.info(f"Concurrency: {sender.concurrency}")

SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_LIGHT_APP_NAME)
SqlEngine.init_engine(pool_size=sender.concurrency, max_overflow=8)
Expand Down
1 change: 1 addition & 0 deletions backend/onyx/background/celery/apps/primary.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None
@worker_init.connect
def on_worker_init(sender: Any, **kwargs: Any) -> None:
logger.info("worker_init signal received.")
multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn
logger.info(f"Multiprocessing start method: {multiprocessing.get_start_method()}")

SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_PRIMARY_APP_NAME)
Expand Down
8 changes: 6 additions & 2 deletions backend/onyx/background/celery/tasks/indexing/tasks.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import multiprocessing
import os
import sys
import time
Expand Down Expand Up @@ -853,11 +854,14 @@ def connector_indexing_proxy_task(
search_settings_id: int,
tenant_id: str | None,
) -> None:
"""celery tasks are forked, but forking is unstable. This proxies work to a spawned task."""
"""celery tasks are forked, but forking is unstable.
This is a thread that proxies work to a spawned task."""

task_logger.info(
f"Indexing watchdog - starting: attempt={index_attempt_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
f"search_settings={search_settings_id} "
f"multiprocessing={multiprocessing.get_start_method()}"
)

if not self.request.id:
Expand Down

0 comments on commit 9622400

Please sign in to comment.