From 2aa6a0f45629c94a5b6c9b8773211a97919ff8db Mon Sep 17 00:00:00 2001 From: Steven Bal Date: Mon, 27 Jun 2022 10:37:05 +0200 Subject: [PATCH] :sparkles: [open-zaak/open-zaak#1203] Retry vars configurable via admin --- src/nrc/api/tasks.py | 77 ++++++++++++++++++++++++++++++++++++++------ 1 file changed, 67 insertions(+), 10 deletions(-) diff --git a/src/nrc/api/tasks.py b/src/nrc/api/tasks.py index b633a5ae..26dc41ec 100644 --- a/src/nrc/api/tasks.py +++ b/src/nrc/api/tasks.py @@ -1,12 +1,15 @@ import json import logging -from django.conf import settings from django.core.management import call_command from django.core.serializers.json import DjangoJSONEncoder from django.utils.translation import gettext_lazy as _ import requests +from celery.exceptions import Ignore, Retry +from celery.utils.time import get_exponential_backoff_interval +from vine.utils import wraps +from vng_api_common.notifications.models import NotificationsConfig from nrc.celery import app from nrc.datamodel.models import Abonnement, NotificatieResponse @@ -14,19 +17,63 @@ logger = logging.getLogger(__name__) +def add_autoretry_behaviour(task, **options): + """ + Adapted from celery to use admin configurable autoretry settings + """ + autoretry_for = tuple( + options.get("autoretry_for", getattr(task, "autoretry_for", ())) + ) + retry_kwargs = options.get("retry_kwargs", getattr(task, "retry_kwargs", {})) + retry_jitter = options.get("retry_jitter", getattr(task, "retry_jitter", True)) + + if autoretry_for and not hasattr(task, "_orig_run"): + + @wraps(task.run) + def run(*args, **kwargs): + config = NotificationsConfig.get_solo() + max_retries = config.notification_delivery_max_retries + retry_backoff = config.notification_delivery_retry_backoff + retry_backoff_max = config.notification_delivery_retry_backoff_max + + task.max_retries = max_retries + + try: + return task._orig_run(*args, **kwargs) + except Ignore: + # If Ignore signal occurs task shouldn't be retried, + # even if it suits autoretry_for list + raise + except Retry: + raise + except autoretry_for as exc: + if retry_backoff: + retry_kwargs["countdown"] = get_exponential_backoff_interval( + factor=retry_backoff, + retries=task.request.retries, + maximum=retry_backoff_max, + full_jitter=retry_jitter, + ) + # Override max_retries + if hasattr(task, "override_max_retries"): + retry_kwargs["max_retries"] = getattr( + task, "override_max_retries", max_retries + ) + + ret = task.retry(exc=exc, **retry_kwargs) + # Stop propagation + if hasattr(task, "override_max_retries"): + delattr(task, "override_max_retries") + raise ret + + task._orig_run, task.run = task.run, run + + class NotificationException(Exception): pass -@app.task( - autoretry_for=( - NotificationException, - requests.RequestException, - ), - max_retries=settings.NOTIFICATION_DELIVERY_MAX_RETRIES, - retry_backoff=settings.NOTIFICATION_DELIVERY_RETRY_BACKOFF, - retry_backoff_max=settings.NOTIFICATION_DELIVERY_RETRY_BACKOFF_MAX, -) +@app.task def deliver_message(sub_id: int, msg: dict, **kwargs) -> None: """ send msg to subscriber @@ -81,3 +128,13 @@ def clean_old_notifications() -> None: cleans up old "Notificatie" and "NotificatieResponse" """ call_command("clean_old_notifications") + + +add_autoretry_behaviour( + deliver_message, + autoretry_for=( + NotificationException, + requests.RequestException, + ), + retry_jitter=False, +)