Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(PC-34345)[API] setup celery tasks for mails #15838

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 168 additions & 2 deletions api/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion api/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dependencies = [
"boto3==1.36.12",
"boussole==2.1.3",
"brevo-python==1.1.2",
"celery[redis]==5.4.0",
"click-option-group==0.5.6",
"click==8.1.8",
"clickhouse-sqlalchemy==0.2.7",
Expand Down Expand Up @@ -80,7 +81,7 @@ dependencies = [
"wtforms-sqlalchemy==0.4.2",
"wtforms==3.2.1",
"xlsxwriter==3.2.2",
"zeep==4.3.1",
"zeep==4.3.1"
]

[tool.poetry.group.dev]
Expand Down
Empty file.
19 changes: 19 additions & 0 deletions api/src/pcapi/celery_tasks/celery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from celery import Celery
from celery import Task
from flask import Flask


def celery_init_app(app: Flask) -> Celery:
# pylint: disable=abstract-method
# Should be ok to only override the __call__ method
# https://docs.celeryq.dev/en/stable/reference/celery.app.task.html#celery.app.task.Task
class FlaskTask(Task):
def __call__(self, *args: object, **kwargs: object) -> object:
with app.app_context():
return self.run(*args, **kwargs)
Comment on lines +12 to +13
Copy link
Contributor

@jcicurel-pass jcicurel-pass Jan 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pour l'instant c'est seulement les mails donc pas d'update en DB mais ce serait p-e pas mal d'avoir atomic à terme (ça risque de coincer au moment où on voudra passer une tâche sur celery, ou alors on peut se dire que ce sera l'occasion de la rendre atomic-compatible)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oui c'est vrai que du coup j'ai pas traité cette problématique pour le moment. Je vais voir pour le rajouter.


celery_app = Celery(app.name, task_cls=FlaskTask)
celery_app.config_from_object(app.config["CELERY"])
celery_app.set_default()
app.extensions["celery"] = celery_app
return celery_app
4 changes: 4 additions & 0 deletions api/src/pcapi/celery_tasks/celery_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from pcapi.flask_app import app as flask_app


celery_app = flask_app.extensions["celery"]
44 changes: 44 additions & 0 deletions api/src/pcapi/celery_tasks/sendinblue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import logging

from brevo_python.rest import ApiException as SendinblueApiException
from celery import shared_task
from pydantic import ValidationError

from pcapi.core.external import sendinblue
from pcapi.core.mails.transactional.send_transactional_email import send_transactional_email
from pcapi.tasks.serialization.sendinblue_tasks import SendTransactionalEmailRequest
from pcapi.tasks.serialization.sendinblue_tasks import UpdateSendinblueContactRequest


logger = logging.getLogger(__name__)


@shared_task(name="mails.tasks.update_contact_attributes", autoretry_for=(SendinblueApiException,), retry_backoff=True)
def update_contact_attributes_task_celery(payload: dict) -> None:
try:
request = UpdateSendinblueContactRequest.parse_obj(payload)
sendinblue.make_update_request(request)
except ValidationError as exp:
logger.error("could not deserialize object", extra={"exception": exp})


@shared_task(
name="mails.tasks.send_transactional_email_primary", autoretry_for=(SendinblueApiException,), retry_backoff=True
)
def send_transactional_email_primary_task_celery(payload: dict) -> None:
try:
request = SendTransactionalEmailRequest.parse_obj(payload)
send_transactional_email(request)
except ValidationError as exp:
logger.error("could not deserialize object", extra={"exception": exp})


@shared_task(
name="mails.tasks.send_transactional_email_secondary", autoretry_for=(SendinblueApiException,), retry_backoff=True
)
def send_transactional_email_secondary_task_celery(payload: dict) -> None:
try:
request = SendTransactionalEmailRequest.parse_obj(payload)
send_transactional_email(request)
except ValidationError as exp:
logger.error("could not deserialize object", extra={"exception": exp})
19 changes: 15 additions & 4 deletions api/src/pcapi/core/external/sendinblue.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
from brevo_python.rest import ApiException as SendinblueApiException

from pcapi import settings
from pcapi.celery_tasks.sendinblue import update_contact_attributes_task_celery
from pcapi.core import mails as mails_api
from pcapi.core.cultural_survey import models as cultural_survey_models
from pcapi.core.external.attributes import models as attributes_models
import pcapi.core.users.models as users_models
from pcapi.tasks.sendinblue_tasks import update_contact_attributes_task
from pcapi.models.feature import FeatureToggle
from pcapi.tasks.sendinblue_tasks import update_contact_attributes_task_cloud_tasks
from pcapi.tasks.serialization.sendinblue_tasks import UpdateSendinblueContactRequest


Expand Down Expand Up @@ -121,9 +123,15 @@ def update_contact_email(user: users_models.User, old_email: str, new_email: str
)

if asynchronous:
update_contact_attributes_task.delay(contact_request)
if FeatureToggle.WIP_ASYNCHRONOUS_CELERY_TASKS.is_active():
update_contact_attributes_task_celery.delay(contact_request.dict())
else:
update_contact_attributes_task_cloud_tasks.delay(contact_request)
else:
update_contact_attributes_task(contact_request)
if FeatureToggle.WIP_ASYNCHRONOUS_CELERY_TASKS.is_active():
update_contact_attributes_task_celery(contact_request.dict())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Il y a un inconvénient au JSON que je n'avais pas en tête, on perd ici le typing au passage (d'ailleurs la signature de update_contact_attributes_task_celery ne doit plus être bonne pour le type de payload)

Il faudrait des TypedDict sinon mais.. ça ferait doublon avec le modèle Pydantic dans ce genre de cas..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Après y a p-e des cas où on aura juste plus besoin du modèle Pydantic et le TypedDict suffira

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bah là effectivement je suis reparti tu modèle Pydantic pour coller le plus possible à l'existant.
Donc effectivement on perd le typing lors du check statique mais on vérifie quand même au runtime. Je voyais pas comment faire mieux sans faire de doublons.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

J'ai pas en tête beaucoup de cas de nos tâches async

Mais en fait si on a des schemas Pydantic définis seulement pour les passer à des tâches, en faisant le switch vers Celery / JSON ça me parait pas déconnant de switch en même temps les schemas vers des TypedDict

Par contre si on a des cas de schemas Pydantic utilisés par des tâches et par autre chose.. là c'est plus embêtant

Mais bon du coup tant que c'est un WIP sous FF on peut sans doute laisser comme ça (vu qu'on va pas virer le Pydantic maintenant de toute façon)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a vu ça avec @mgeoffray-pass et l'objectif du coup c'est de passer les objets en TypedDict.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

C'est un peu la galère en vrai pour deux raisons :

  • Dans le décorateur de task on attend bien un modèle pydantic donc ça demande de revoir pas mal le système de cloud tasks actuel, ce qui est risqué.

Donc je me suis dit que j'allais dupliquer le type et faire un type cloud_task et un type celery à chaque fois mais je me suis heurté au problème suivant :

  • Dans le code qui utilise cet objet on utilise le . comme on le fait pour une classe pour accéder à ses attributs et pas les [] comme on le ferait sur un dict donc ça coince. Donc garder les deux systèmes en // ça devient compliqué.

En cherchant un peu des solutions je suis tombé sur un blogpost qui explique comment gérer les modèles pydantic dans Celery : https://benninger.ca/posts/celery-serializer-pydantic/

Et sinon fun fact : Dans la prochaine version de Celery il y a le support des modèles pydantic qui arrive : https://docs.celeryq.dev/en/latest/history/whatsnew-5.5.html#pydantic-support

Du coup je vois plusieurs options :

  • Mettre en place le serializer custom pour garder le fonctionnement actuel des modèles pydantic mais avec le serializer JSON.
  • Laisser comme j'ai fait pour le moment et traiter ça avec la 5.5 lorsqu'elle sort (mais bon je ne sais pas quand)
  • Refacto le système de cloud task pour pouvoir prendre aussi des TypedDict.

else:
update_contact_attributes_task_cloud_tasks(contact_request)


def update_contact_attributes(
Expand Down Expand Up @@ -151,7 +159,10 @@ def update_contact_attributes(
)

if asynchronous:
update_contact_attributes_task.delay(contact_request)
if FeatureToggle.WIP_ASYNCHRONOUS_CELERY_TASKS.is_active():
update_contact_attributes_task_celery.delay(contact_request.dict())
else:
update_contact_attributes_task_cloud_tasks.delay(contact_request)
else:
make_update_request(contact_request)

Expand Down
Loading
Loading