-
Notifications
You must be signed in to change notification settings - Fork 41
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
(PC-34345)[API] setup celery tasks for mails #15838
base: master
Are you sure you want to change the base?
Changes from all commits
59fb87f
645da34
b5df74a
d2704a4
4d1f626
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
from celery import Celery | ||
from celery import Task | ||
from flask import Flask | ||
|
||
|
||
def celery_init_app(app: Flask) -> Celery: | ||
# pylint: disable=abstract-method | ||
# Should be ok to only override the __call__ method | ||
# https://docs.celeryq.dev/en/stable/reference/celery.app.task.html#celery.app.task.Task | ||
class FlaskTask(Task): | ||
def __call__(self, *args: object, **kwargs: object) -> object: | ||
with app.app_context(): | ||
return self.run(*args, **kwargs) | ||
|
||
celery_app = Celery(app.name, task_cls=FlaskTask) | ||
celery_app.config_from_object(app.config["CELERY"]) | ||
celery_app.set_default() | ||
app.extensions["celery"] = celery_app | ||
return celery_app |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
from pcapi.flask_app import app as flask_app | ||
|
||
|
||
celery_app = flask_app.extensions["celery"] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
import logging | ||
|
||
from brevo_python.rest import ApiException as SendinblueApiException | ||
from celery import shared_task | ||
from pydantic import ValidationError | ||
|
||
from pcapi.core.external import sendinblue | ||
from pcapi.core.mails.transactional.send_transactional_email import send_transactional_email | ||
from pcapi.tasks.serialization.sendinblue_tasks import SendTransactionalEmailRequest | ||
from pcapi.tasks.serialization.sendinblue_tasks import UpdateSendinblueContactRequest | ||
|
||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
@shared_task(name="mails.tasks.update_contact_attributes", autoretry_for=(SendinblueApiException,), retry_backoff=True) | ||
def update_contact_attributes_task_celery(payload: dict) -> None: | ||
try: | ||
request = UpdateSendinblueContactRequest.parse_obj(payload) | ||
sendinblue.make_update_request(request) | ||
except ValidationError as exp: | ||
logger.error("could not deserialize object", extra={"exception": exp}) | ||
|
||
|
||
@shared_task( | ||
name="mails.tasks.send_transactional_email_primary", autoretry_for=(SendinblueApiException,), retry_backoff=True | ||
) | ||
def send_transactional_email_primary_task_celery(payload: dict) -> None: | ||
try: | ||
request = SendTransactionalEmailRequest.parse_obj(payload) | ||
send_transactional_email(request) | ||
except ValidationError as exp: | ||
logger.error("could not deserialize object", extra={"exception": exp}) | ||
|
||
|
||
@shared_task( | ||
name="mails.tasks.send_transactional_email_secondary", autoretry_for=(SendinblueApiException,), retry_backoff=True | ||
) | ||
def send_transactional_email_secondary_task_celery(payload: dict) -> None: | ||
try: | ||
request = SendTransactionalEmailRequest.parse_obj(payload) | ||
send_transactional_email(request) | ||
except ValidationError as exp: | ||
logger.error("could not deserialize object", extra={"exception": exp}) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,11 +15,13 @@ | |
from brevo_python.rest import ApiException as SendinblueApiException | ||
|
||
from pcapi import settings | ||
from pcapi.celery_tasks.sendinblue import update_contact_attributes_task_celery | ||
from pcapi.core import mails as mails_api | ||
from pcapi.core.cultural_survey import models as cultural_survey_models | ||
from pcapi.core.external.attributes import models as attributes_models | ||
import pcapi.core.users.models as users_models | ||
from pcapi.tasks.sendinblue_tasks import update_contact_attributes_task | ||
from pcapi.models.feature import FeatureToggle | ||
from pcapi.tasks.sendinblue_tasks import update_contact_attributes_task_cloud_tasks | ||
from pcapi.tasks.serialization.sendinblue_tasks import UpdateSendinblueContactRequest | ||
|
||
|
||
|
@@ -121,9 +123,15 @@ def update_contact_email(user: users_models.User, old_email: str, new_email: str | |
) | ||
|
||
if asynchronous: | ||
update_contact_attributes_task.delay(contact_request) | ||
if FeatureToggle.WIP_ASYNCHRONOUS_CELERY_TASKS.is_active(): | ||
update_contact_attributes_task_celery.delay(contact_request.dict()) | ||
else: | ||
update_contact_attributes_task_cloud_tasks.delay(contact_request) | ||
else: | ||
update_contact_attributes_task(contact_request) | ||
if FeatureToggle.WIP_ASYNCHRONOUS_CELERY_TASKS.is_active(): | ||
update_contact_attributes_task_celery(contact_request.dict()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Il y a un inconvénient au JSON que je n'avais pas en tête, on perd ici le typing au passage (d'ailleurs la signature de Il faudrait des TypedDict sinon mais.. ça ferait doublon avec le modèle Pydantic dans ce genre de cas.. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Après y a p-e des cas où on aura juste plus besoin du modèle Pydantic et le TypedDict suffira There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bah là effectivement je suis reparti tu modèle Pydantic pour coller le plus possible à l'existant. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. J'ai pas en tête beaucoup de cas de nos tâches async Mais en fait si on a des schemas Pydantic définis seulement pour les passer à des tâches, en faisant le switch vers Celery / JSON ça me parait pas déconnant de switch en même temps les schemas vers des TypedDict Par contre si on a des cas de schemas Pydantic utilisés par des tâches et par autre chose.. là c'est plus embêtant Mais bon du coup tant que c'est un WIP sous FF on peut sans doute laisser comme ça (vu qu'on va pas virer le Pydantic maintenant de toute façon) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On a vu ça avec @mgeoffray-pass et l'objectif du coup c'est de passer les objets en TypedDict. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. C'est un peu la galère en vrai pour deux raisons :
Donc je me suis dit que j'allais dupliquer le type et faire un type cloud_task et un type celery à chaque fois mais je me suis heurté au problème suivant :
En cherchant un peu des solutions je suis tombé sur un blogpost qui explique comment gérer les modèles pydantic dans Celery : https://benninger.ca/posts/celery-serializer-pydantic/ Et sinon fun fact : Dans la prochaine version de Celery il y a le support des modèles pydantic qui arrive : https://docs.celeryq.dev/en/latest/history/whatsnew-5.5.html#pydantic-support Du coup je vois plusieurs options :
|
||
else: | ||
update_contact_attributes_task_cloud_tasks(contact_request) | ||
|
||
|
||
def update_contact_attributes( | ||
|
@@ -151,7 +159,10 @@ def update_contact_attributes( | |
) | ||
|
||
if asynchronous: | ||
update_contact_attributes_task.delay(contact_request) | ||
if FeatureToggle.WIP_ASYNCHRONOUS_CELERY_TASKS.is_active(): | ||
update_contact_attributes_task_celery.delay(contact_request.dict()) | ||
else: | ||
update_contact_attributes_task_cloud_tasks.delay(contact_request) | ||
else: | ||
make_update_request(contact_request) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pour l'instant c'est seulement les mails donc pas d'update en DB mais ce serait p-e pas mal d'avoir atomic à terme (ça risque de coincer au moment où on voudra passer une tâche sur celery, ou alors on peut se dire que ce sera l'occasion de la rendre atomic-compatible)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oui c'est vrai que du coup j'ai pas traité cette problématique pour le moment. Je vais voir pour le rajouter.