Skip to content

Commit

Permalink
Bugfix/termination cleanup (#4077)
Browse files Browse the repository at this point in the history
* move activity timeout cleanup to the function exit

* fix excessive logging

---------

Co-authored-by: Richard Kuo (Danswer) <rkuo@onyx.app>
  • Loading branch information
rkuo-danswer and Richard Kuo (Danswer) authored Feb 24, 2025
1 parent 076619c commit 558bbe1
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 25 deletions.
48 changes: 24 additions & 24 deletions backend/onyx/background/celery/tasks/indexing/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,9 @@ def connector_indexing_proxy_task(
TODO(rkuo): refactor this so that there is a single return path where we canonically
log the result of running this function.
NOTE: we try/except all db access in this function because as a watchdog, this function
needs to be extremely stable.
"""
start = time.monotonic()

Expand Down Expand Up @@ -1016,7 +1019,7 @@ def connector_indexing_proxy_task(
job.release()
break

# if a termination signal is detected, clean up and break
# if a termination signal is detected, break (exit point will clean up)
if self.request.id and redis_connector_index.terminating(self.request.id):
task_logger.warning(
log_builder.build("Indexing watchdog - termination signal detected")
Expand All @@ -1025,6 +1028,7 @@ def connector_indexing_proxy_task(
result.status = IndexingWatchdogTerminalStatus.TERMINATED_BY_SIGNAL
break

# if activity timeout is detected, break (exit point will clean up)
if not redis_connector_index.connector_active():
task_logger.warning(
log_builder.build(
Expand All @@ -1033,25 +1037,6 @@ def connector_indexing_proxy_task(
)
)

try:
with get_session_with_current_tenant() as db_session:
mark_attempt_failed(
index_attempt_id,
db_session,
"Indexing watchdog - activity timeout exceeded: "
f"attempt={index_attempt_id} "
f"timeout={CELERY_INDEXING_WATCHDOG_CONNECTOR_TIMEOUT}s",
)
except Exception:
# if the DB exceptions, we'll just get an unfriendly failure message
# in the UI instead of the cancellation message
logger.exception(
log_builder.build(
"Indexing watchdog - transient exception marking index attempt as failed"
)
)

job.cancel()
result.status = (
IndexingWatchdogTerminalStatus.TERMINATED_BY_ACTIVITY_TIMEOUT
)
Expand All @@ -1071,8 +1056,6 @@ def connector_indexing_proxy_task(
if not index_attempt.is_finished():
continue
except Exception:
# if the DB exceptioned, just restart the check.
# polling the index attempt status doesn't need to be strongly consistent
task_logger.exception(
log_builder.build(
"Indexing watchdog - transient exception looking up index attempt"
Expand Down Expand Up @@ -1139,15 +1122,32 @@ def connector_indexing_proxy_task(
"Connector termination signal detected",
)
except Exception:
# if the DB exceptions, we'll just get an unfriendly failure message
# in the UI instead of the cancellation message
task_logger.exception(
log_builder.build(
"Indexing watchdog - transient exception marking index attempt as canceled"
)
)

job.cancel()
elif result.status == IndexingWatchdogTerminalStatus.TERMINATED_BY_ACTIVITY_TIMEOUT:
try:
with get_session_with_current_tenant() as db_session:
mark_attempt_failed(
index_attempt_id,
db_session,
"Indexing watchdog - activity timeout exceeded: "
f"attempt={index_attempt_id} "
f"timeout={CELERY_INDEXING_WATCHDOG_CONNECTOR_TIMEOUT}s",
)
except Exception:
logger.exception(
log_builder.build(
"Indexing watchdog - transient exception marking index attempt as failed"
)
)
job.cancel()
else:
pass

task_logger.info(
log_builder.build(
Expand Down
5 changes: 4 additions & 1 deletion backend/onyx/indexing/indexing_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,10 @@ def index_doc_batch_with_handler(
tenant_id=tenant_id,
)
except Exception as e:
logger.exception(f"Failed to index document batch: {document_batch}")
# don't log the batch directly, it's too much text
document_ids = [doc.id for doc in document_batch]
logger.exception(f"Failed to index document batch: {document_ids}")

index_pipeline_result = IndexingPipelineResult(
new_docs=0,
total_docs=len(document_batch),
Expand Down

0 comments on commit 558bbe1

Please sign in to comment.