Skip to content

Commit

Permalink
move lock_beat test outside the try catch so that we don't worry abou…
Browse files Browse the repository at this point in the history
…t testing locks we never took
  • Loading branch information
Richard Kuo (Danswer) committed Jan 7, 2025
1 parent 59934e6 commit 1f82a3d
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ def check_for_connector_deletion_task(
timeout=CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT,
)

try:
# these tasks should never overlap
if not lock_beat.acquire(blocking=False):
return None
# these tasks should never overlap
if not lock_beat.acquire(blocking=False):
return None

try:
# collect cc_pair_ids
cc_pair_ids: list[int] = []
with get_session_with_tenant(tenant_id) as db_session:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,11 @@ def check_for_doc_permissions_sync(self: Task, *, tenant_id: str | None) -> bool
timeout=CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT,
)

try:
# these tasks should never overlap
if not lock_beat.acquire(blocking=False):
return None
# these tasks should never overlap
if not lock_beat.acquire(blocking=False):
return None

try:
# get all cc pairs that need to be synced
cc_pair_ids_to_sync: list[int] = []
with get_session_with_tenant(tenant_id) as db_session:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@ def check_for_external_group_sync(self: Task, *, tenant_id: str | None) -> bool
timeout=CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT,
)

try:
# these tasks should never overlap
if not lock_beat.acquire(blocking=False):
return None
# these tasks should never overlap
if not lock_beat.acquire(blocking=False):
return None

try:
cc_pair_ids_to_sync: list[int] = []
with get_session_with_tenant(tenant_id) as db_session:
cc_pairs = get_all_auto_sync_cc_pairs(db_session)
Expand Down
8 changes: 4 additions & 4 deletions backend/onyx/background/celery/tasks/indexing/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,11 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
timeout=CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT,
)

try:
# these tasks should never overlap
if not lock_beat.acquire(blocking=False):
return None
# these tasks should never overlap
if not lock_beat.acquire(blocking=False):
return None

try:
locked = True

# check for search settings swap
Expand Down
8 changes: 4 additions & 4 deletions backend/onyx/background/celery/tasks/pruning/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,11 @@ def check_for_pruning(self: Task, *, tenant_id: str | None) -> bool | None:
timeout=CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT,
)

try:
# these tasks should never overlap
if not lock_beat.acquire(blocking=False):
return None
# these tasks should never overlap
if not lock_beat.acquire(blocking=False):
return None

try:
cc_pair_ids: list[int] = []
with get_session_with_tenant(tenant_id) as db_session:
cc_pairs = get_connector_credential_pairs(db_session)
Expand Down
19 changes: 9 additions & 10 deletions backend/onyx/background/celery/tasks/vespa/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,11 @@ def check_for_vespa_sync_task(self: Task, *, tenant_id: str | None) -> bool | No
timeout=CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT,
)

try:
# these tasks should never overlap
if not lock_beat.acquire(blocking=False):
return None
# these tasks should never overlap
if not lock_beat.acquire(blocking=False):
return None

try:
with get_session_with_tenant(tenant_id) as db_session:
try_generate_stale_document_sync_tasks(
self.app, db_session, r, lock_beat, tenant_id
Expand Down Expand Up @@ -755,7 +755,7 @@ def monitor_ccpair_indexing_taskset(


@shared_task(name=OnyxCeleryTask.MONITOR_VESPA_SYNC, soft_time_limit=300, bind=True)
def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool:
def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None:
"""This is a celery beat task that monitors and finalizes metadata sync tasksets.
It scans for fence values and then gets the counts of any associated tasksets.
If the count is 0, that means all tasks finished and we should clean up.
Expand All @@ -779,12 +779,11 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool:
timeout=CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT,
)

try:
# prevent overlapping tasks
if not lock_beat.acquire(blocking=False):
task_logger.info("monitor_vespa_sync exiting due to overlap")
return False
# prevent overlapping tasks
if not lock_beat.acquire(blocking=False):
return None

try:
# print current queue lengths
phase_start = time.monotonic()
# we don't need every tenant polling redis for this info.
Expand Down

0 comments on commit 1f82a3d

Please sign in to comment.