Skip to content

Commit

Permalink
✨ [open-zaak/open-zaak#1203] Retry vars configurable via admin
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenbal committed Apr 16, 2024
1 parent 9f8a46a commit 2aa6a0f
Showing 1 changed file with 67 additions and 10 deletions.
77 changes: 67 additions & 10 deletions src/nrc/api/tasks.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,79 @@
import json
import logging

from django.conf import settings
from django.core.management import call_command
from django.core.serializers.json import DjangoJSONEncoder
from django.utils.translation import gettext_lazy as _

import requests
from celery.exceptions import Ignore, Retry
from celery.utils.time import get_exponential_backoff_interval
from vine.utils import wraps
from vng_api_common.notifications.models import NotificationsConfig

from nrc.celery import app
from nrc.datamodel.models import Abonnement, NotificatieResponse

logger = logging.getLogger(__name__)


def add_autoretry_behaviour(task, **options):
"""
Adapted from celery to use admin configurable autoretry settings
"""
autoretry_for = tuple(
options.get("autoretry_for", getattr(task, "autoretry_for", ()))
)
retry_kwargs = options.get("retry_kwargs", getattr(task, "retry_kwargs", {}))
retry_jitter = options.get("retry_jitter", getattr(task, "retry_jitter", True))

if autoretry_for and not hasattr(task, "_orig_run"):

@wraps(task.run)
def run(*args, **kwargs):
config = NotificationsConfig.get_solo()
max_retries = config.notification_delivery_max_retries
retry_backoff = config.notification_delivery_retry_backoff
retry_backoff_max = config.notification_delivery_retry_backoff_max

task.max_retries = max_retries

try:
return task._orig_run(*args, **kwargs)
except Ignore:
# If Ignore signal occurs task shouldn't be retried,
# even if it suits autoretry_for list
raise
except Retry:
raise
except autoretry_for as exc:
if retry_backoff:
retry_kwargs["countdown"] = get_exponential_backoff_interval(
factor=retry_backoff,
retries=task.request.retries,
maximum=retry_backoff_max,
full_jitter=retry_jitter,
)
# Override max_retries
if hasattr(task, "override_max_retries"):
retry_kwargs["max_retries"] = getattr(
task, "override_max_retries", max_retries
)

ret = task.retry(exc=exc, **retry_kwargs)
# Stop propagation
if hasattr(task, "override_max_retries"):
delattr(task, "override_max_retries")
raise ret

task._orig_run, task.run = task.run, run


class NotificationException(Exception):
pass


@app.task(
autoretry_for=(
NotificationException,
requests.RequestException,
),
max_retries=settings.NOTIFICATION_DELIVERY_MAX_RETRIES,
retry_backoff=settings.NOTIFICATION_DELIVERY_RETRY_BACKOFF,
retry_backoff_max=settings.NOTIFICATION_DELIVERY_RETRY_BACKOFF_MAX,
)
@app.task
def deliver_message(sub_id: int, msg: dict, **kwargs) -> None:
"""
send msg to subscriber
Expand Down Expand Up @@ -81,3 +128,13 @@ def clean_old_notifications() -> None:
cleans up old "Notificatie" and "NotificatieResponse"
"""
call_command("clean_old_notifications")


add_autoretry_behaviour(
deliver_message,
autoretry_for=(
NotificationException,
requests.RequestException,
),
retry_jitter=False,
)

0 comments on commit 2aa6a0f

Please sign in to comment.