Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/indexing hard timeout 3 #3980

Merged
merged 13 commits into from
Feb 19, 2025
74 changes: 71 additions & 3 deletions backend/onyx/background/celery/tasks/indexing/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from onyx.configs.app_configs import VESPA_CLOUD_KEY_PATH
from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT
from onyx.configs.constants import CELERY_INDEXING_LOCK_TIMEOUT
from onyx.configs.constants import CELERY_INDEXING_WATCHDOG_CONNECTOR_TIMEOUT
from onyx.configs.constants import CELERY_TASK_WAIT_FOR_FENCE_TIMEOUT
from onyx.configs.constants import OnyxCeleryQueues
from onyx.configs.constants import OnyxCeleryTask
Expand Down Expand Up @@ -90,6 +91,9 @@ class IndexingWatchdogTerminalStatus(str, Enum):
SUCCEEDED = "succeeded"

SPAWN_FAILED = "spawn_failed" # connector spawn failed
SPAWN_NOT_ALIVE = (
"spawn_not_alive" # spawn succeeded but process did not come alive
)

BLOCKED_BY_DELETION = "blocked_by_deletion"
BLOCKED_BY_STOP_SIGNAL = "blocked_by_stop_signal"
Expand All @@ -112,6 +116,8 @@ class IndexingWatchdogTerminalStatus(str, Enum):
# the watchdog terminated the task due to no activity
TERMINATED_BY_ACTIVITY_TIMEOUT = "terminated_by_activity_timeout"

# NOTE: this may actually be the same as SIGKILL, but parsed differently by python
# consolidate once we know more
OUT_OF_MEMORY = "out_of_memory"

PROCESS_SIGNAL_SIGKILL = "process_signal_sigkill"
Expand All @@ -137,6 +143,7 @@ def code(self) -> int:
def from_code(cls, code: int) -> "IndexingWatchdogTerminalStatus":
_CODE_TO_ENUM: dict[int, IndexingWatchdogTerminalStatus] = {
-9: IndexingWatchdogTerminalStatus.PROCESS_SIGNAL_SIGKILL,
137: IndexingWatchdogTerminalStatus.OUT_OF_MEMORY,
248: IndexingWatchdogTerminalStatus.BLOCKED_BY_DELETION,
249: IndexingWatchdogTerminalStatus.BLOCKED_BY_STOP_SIGNAL,
250: IndexingWatchdogTerminalStatus.FENCE_NOT_FOUND,
Expand Down Expand Up @@ -764,9 +771,9 @@ def connector_indexing_task(
callback = IndexingCallback(
os.getppid(),
redis_connector,
redis_connector_index,
lock,
r,
redis_connector_index,
)

logger.info(
Expand Down Expand Up @@ -912,7 +919,7 @@ def connector_indexing_proxy_task(
pure=False,
)

if not job:
if not job or not job.process:
result.status = IndexingWatchdogTerminalStatus.SPAWN_FAILED
task_logger.info(
log_builder.build(
Expand All @@ -923,7 +930,33 @@ def connector_indexing_proxy_task(
)
return

task_logger.info(log_builder.build("Indexing watchdog - spawn succeeded"))
# Ensure the process has moved out of the starting state
num_waits = 0
while True:
if num_waits > 15:
result.status = IndexingWatchdogTerminalStatus.SPAWN_NOT_ALIVE
task_logger.info(
log_builder.build(
"Indexing watchdog - finished",
status=str(result.status.value),
exit_code=str(result.exit_code),
)
)
job.release()
return

if job.process.is_alive() or job.process.exitcode is not None:
break

sleep(1)
num_waits += 1

task_logger.info(
log_builder.build(
"Indexing watchdog - spawn succeeded",
pid=str(job.process.pid),
)
)

redis_connector = RedisConnector(tenant_id, cc_pair_id)
redis_connector_index = redis_connector.new_index(search_settings_id)
Expand All @@ -940,6 +973,9 @@ def connector_indexing_proxy_task(
index_attempt.connector_credential_pair.connector.source.value
)

redis_connector_index.set_active() # renew active signal
redis_connector_index.set_connector_active() # prime the connective active signal

while True:
sleep(5)

Expand Down Expand Up @@ -974,6 +1010,38 @@ def connector_indexing_proxy_task(
result.status = IndexingWatchdogTerminalStatus.TERMINATED_BY_SIGNAL
break

if not redis_connector_index.connector_active():
task_logger.warning(
log_builder.build(
"Indexing watchdog - activity timeout exceeded",
timeout=f"{CELERY_INDEXING_WATCHDOG_CONNECTOR_TIMEOUT}s",
)
)

try:
with get_session_with_tenant(tenant_id) as db_session:
mark_attempt_failed(
index_attempt_id,
db_session,
"Indexing watchdog - activity timeout exceeded: "
f"attempt={index_attempt_id} "
f"timeout={CELERY_INDEXING_WATCHDOG_CONNECTOR_TIMEOUT}s",
)
except Exception:
# if the DB exceptions, we'll just get an unfriendly failure message
# in the UI instead of the cancellation message
logger.exception(
log_builder.build(
"Indexing watchdog - transient exception marking index attempt as failed"
)
)

job.cancel()
result.status = (
IndexingWatchdogTerminalStatus.TERMINATED_BY_ACTIVITY_TIMEOUT
)
break

# if the spawned task is still running, restart the check once again
# if the index attempt is not in a finished status
try:
Expand Down
32 changes: 23 additions & 9 deletions backend/onyx/background/celery/tasks/indexing/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,27 +93,25 @@ def get_unfenced_index_attempt_ids(db_session: Session, r: redis.Redis) -> list[
return unfenced_attempts


class IndexingCallback(IndexingHeartbeatInterface):
class IndexingCallbackBase(IndexingHeartbeatInterface):
PARENT_CHECK_INTERVAL = 60

def __init__(
self,
parent_pid: int,
redis_connector: RedisConnector,
redis_connector_index: RedisConnectorIndex,
redis_lock: RedisLock,
redis_client: Redis,
):
super().__init__()
self.parent_pid = parent_pid
self.redis_connector: RedisConnector = redis_connector
self.redis_connector_index: RedisConnectorIndex = redis_connector_index
self.redis_lock: RedisLock = redis_lock
self.redis_client = redis_client
self.started: datetime = datetime.now(timezone.utc)
self.redis_lock.reacquire()

self.last_tag: str = "IndexingCallback.__init__"
self.last_tag: str = f"{self.__class__.__name__}.__init__"
self.last_lock_reacquire: datetime = datetime.now(timezone.utc)
self.last_lock_monotonic = time.monotonic()

Expand All @@ -127,8 +125,8 @@ def should_stop(self) -> bool:

def progress(self, tag: str, amount: int) -> None:
# rkuo: this shouldn't be necessary yet because we spawn the process this runs inside
# with daemon = True. It seems likely some indexing tasks will need to spawn other processes eventually
# so leave this code in until we're ready to test it.
# with daemon=True. It seems likely some indexing tasks will need to spawn other processes
# eventually, which daemon=True prevents, so leave this code in until we're ready to test it.

# if self.parent_pid:
# # check if the parent pid is alive so we aren't running as a zombie
Expand All @@ -143,8 +141,6 @@ def progress(self, tag: str, amount: int) -> None:
# self.last_parent_check = now

try:
self.redis_connector.prune.set_active()

current_time = time.monotonic()
if current_time - self.last_lock_monotonic >= (
CELERY_GENERIC_BEAT_LOCK_TIMEOUT / 4
Expand All @@ -156,7 +152,7 @@ def progress(self, tag: str, amount: int) -> None:
self.last_tag = tag
except LockError:
logger.exception(
f"IndexingCallback - lock.reacquire exceptioned: "
f"{self.__class__.__name__} - lock.reacquire exceptioned: "
f"lock_timeout={self.redis_lock.timeout} "
f"start={self.started} "
f"last_tag={self.last_tag} "
Expand All @@ -167,6 +163,24 @@ def progress(self, tag: str, amount: int) -> None:
redis_lock_dump(self.redis_lock, self.redis_client)
raise


class IndexingCallback(IndexingCallbackBase):
def __init__(
self,
parent_pid: int,
redis_connector: RedisConnector,
redis_lock: RedisLock,
redis_client: Redis,
redis_connector_index: RedisConnectorIndex,
):
super().__init__(parent_pid, redis_connector, redis_lock, redis_client)

self.redis_connector_index: RedisConnectorIndex = redis_connector_index

def progress(self, tag: str, amount: int) -> None:
self.redis_connector_index.set_active()
self.redis_connector_index.set_connector_active()
super().progress(tag, amount)
self.redis_client.incrby(
self.redis_connector_index.generator_progress_key, amount
)
Expand Down
13 changes: 9 additions & 4 deletions backend/onyx/background/celery/tasks/pruning/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from onyx.background.celery.celery_redis import celery_get_queued_task_ids
from onyx.background.celery.celery_redis import celery_get_unacked_task_ids
from onyx.background.celery.celery_utils import extract_ids_from_runnable_connector
from onyx.background.celery.tasks.indexing.utils import IndexingCallback
from onyx.background.celery.tasks.indexing.utils import IndexingCallbackBase
from onyx.configs.app_configs import ALLOW_SIMULTANEOUS_PRUNING
from onyx.configs.app_configs import JOB_TIMEOUT
from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT
Expand Down Expand Up @@ -62,6 +62,12 @@
logger = setup_logger()


class PruneCallback(IndexingCallbackBase):
def progress(self, tag: str, amount: int) -> None:
self.redis_connector.prune.set_active()
super().progress(tag, amount)


"""Jobs / utils for kicking off pruning tasks."""


Expand Down Expand Up @@ -434,12 +440,11 @@ def connector_pruning_generator_task(
)

search_settings = get_current_search_settings(db_session)
redis_connector_index = redis_connector.new_index(search_settings.id)
redis_connector.new_index(search_settings.id)

callback = IndexingCallback(
callback = PruneCallback(
0,
redis_connector,
redis_connector_index,
lock,
r,
)
Expand Down
13 changes: 11 additions & 2 deletions backend/onyx/configs/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,18 @@

CELERY_PRIMARY_WORKER_LOCK_TIMEOUT = 120

# needs to be long enough to cover the maximum time it takes to download an object

# hard timeout applied by the watchdog to the indexing connector run
# to handle hung connectors
CELERY_INDEXING_WATCHDOG_CONNECTOR_TIMEOUT = 3 * 60 * 60 # 3 hours (in seconds)

# soft timeout for the lock taken by the indexing connector run
# allows the lock to eventually expire if the managing code around it dies
# if we can get callbacks as object bytes download, we could lower this a lot.
CELERY_INDEXING_LOCK_TIMEOUT = 3 * 60 * 60 # 60 min
# CELERY_INDEXING_WATCHDOG_CONNECTOR_TIMEOUT + 15 minutes
# hard termination should always fire first if the connector is hung
CELERY_INDEXING_LOCK_TIMEOUT = CELERY_INDEXING_WATCHDOG_CONNECTOR_TIMEOUT + 900


# how long a task should wait for associated fence to be ready
CELERY_TASK_WAIT_FOR_FENCE_TIMEOUT = 5 * 60 # 5 min
Expand Down
29 changes: 28 additions & 1 deletion backend/onyx/redis/redis_connector_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import redis
from pydantic import BaseModel

from onyx.configs.constants import CELERY_INDEXING_WATCHDOG_CONNECTOR_TIMEOUT
from onyx.configs.constants import OnyxRedisConstants


Expand Down Expand Up @@ -45,6 +46,10 @@ class RedisConnectorIndex:
WATCHDOG_PREFIX = PREFIX + "_watchdog"
WATCHDOG_TTL = 300

# used to signal that the connector itself is still running
CONNECTOR_ACTIVE_PREFIX = PREFIX + "_connector_active"
CONNECTOR_ACTIVE_TTL = CELERY_INDEXING_WATCHDOG_CONNECTOR_TIMEOUT

def __init__(
self,
tenant_id: str | None,
Expand All @@ -68,9 +73,13 @@ def __init__(
f"{self.GENERATOR_LOCK_PREFIX}_{id}/{search_settings_id}"
)
self.terminate_key = f"{self.TERMINATE_PREFIX}_{id}/{search_settings_id}"
self.active_key = f"{self.ACTIVE_PREFIX}_{id}/{search_settings_id}"
self.watchdog_key = f"{self.WATCHDOG_PREFIX}_{id}/{search_settings_id}"

self.active_key = f"{self.ACTIVE_PREFIX}_{id}/{search_settings_id}"
self.connector_active_key = (
f"{self.CONNECTOR_ACTIVE_PREFIX}_{id}/{search_settings_id}"
)

@classmethod
def fence_key_with_ids(cls, cc_pair_id: int, search_settings_id: int) -> str:
return f"{cls.FENCE_PREFIX}_{cc_pair_id}/{search_settings_id}"
Expand Down Expand Up @@ -156,6 +165,20 @@ def active(self) -> bool:

return False

def set_connector_active(self) -> None:
"""This sets a signal to keep the indexing flow from getting cleaned up within
the expiration time.

The slack in timing is needed to avoid race conditions where simply checking
the celery queue and task status could result in race conditions."""
self.redis.set(self.connector_active_key, 0, ex=self.CONNECTOR_ACTIVE_TTL)

def connector_active(self) -> bool:
if self.redis.exists(self.connector_active_key):
return True

return False

def generator_locked(self) -> bool:
if self.redis.exists(self.generator_lock_key):
return True
Expand Down Expand Up @@ -194,6 +217,7 @@ def get_completion(self) -> int | None:

def reset(self) -> None:
self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
self.redis.delete(self.connector_active_key)
self.redis.delete(self.active_key)
self.redis.delete(self.generator_lock_key)
self.redis.delete(self.generator_progress_key)
Expand All @@ -203,6 +227,9 @@ def reset(self) -> None:
@staticmethod
def reset_all(r: redis.Redis) -> None:
"""Deletes all redis values for all connectors"""
for key in r.scan_iter(RedisConnectorIndex.CONNECTOR_ACTIVE_PREFIX + "*"):
r.delete(key)

for key in r.scan_iter(RedisConnectorIndex.ACTIVE_PREFIX + "*"):
r.delete(key)

Expand Down