Skip to content

Commit

Permalink
more reliable account event processing (#6489)
Browse files Browse the repository at this point in the history
  • Loading branch information
escattone authored Feb 7, 2025
1 parent 0ddffe6 commit a381969
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 6 deletions.
Empty file.
Empty file.
20 changes: 20 additions & 0 deletions kitsune/users/management/commands/process_account_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from django.core.management.base import BaseCommand

from kitsune.users.tasks import process_unprocessed_account_events


class Command(BaseCommand):
help = "Process all unprocessed account events created within the given past number of days."

def add_arguments(self, parser):
parser.add_argument(
"num_days_ago",
type=int,
help=(
"The past number of days within which the "
"unprocessed account events have been created."
),
)

def handle(self, *args, **options):
process_unprocessed_account_events.delay(options["num_days_ago"])
39 changes: 33 additions & 6 deletions kitsune/users/tasks.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import json
from datetime import datetime
from datetime import datetime, timedelta

from celery import shared_task

Expand All @@ -10,7 +10,12 @@
from kitsune.users.utils import anonymize_user


@shared_task
shared_task_with_retry = shared_task(
acks_late=True, autoretry_for=(Exception,), retry_backoff=2, retry_kwargs=dict(max_retries=4)
)


@shared_task_with_retry
@skip_if_read_only_mode
def process_event_delete_user(event_id):
event = AccountEvent.objects.get(id=event_id)
Expand All @@ -21,7 +26,7 @@ def process_event_delete_user(event_id):
event.save()


@shared_task
@shared_task_with_retry
@skip_if_read_only_mode
def process_event_subscription_state_change(event_id):
event = AccountEvent.objects.get(id=event_id)
Expand All @@ -48,7 +53,7 @@ def process_event_subscription_state_change(event_id):
event.save()


@shared_task
@shared_task_with_retry
@skip_if_read_only_mode
def process_event_password_change(event_id):
event = AccountEvent.objects.get(id=event_id)
Expand All @@ -67,9 +72,9 @@ def process_event_password_change(event_id):
event.save()


@shared_task
@shared_task_with_retry
@skip_if_read_only_mode
def process_event_profile_change(event_id):
def process_event_profile_change(self, event_id):
event = AccountEvent.objects.get(id=event_id)
refresh_token = event.profile.fxa_refresh_token

Expand All @@ -86,3 +91,25 @@ def process_event_profile_change(event_id):

event.status = AccountEvent.PROCESSED
event.save()


@shared_task
def process_unprocessed_account_events(days):
"""
Attempt to process all unprocessed account events that have been
created within the past "days" number of days.
"""
days_ago = datetime.now() - timedelta(days=days)

for event in AccountEvent.objects.filter(
status=AccountEvent.UNPROCESSED, created_at__gte=days_ago
):
match event.event_type:
case AccountEvent.DELETE_USER:
process_event_delete_user.delay(event.id)
case AccountEvent.SUBSCRIPTION_STATE_CHANGE:
process_event_subscription_state_change.delay(event.id)
case AccountEvent.PASSWORD_CHANGE:
process_event_password_change.delay(event.id)
case AccountEvent.PROFILE_CHANGE:
process_event_profile_change.delay(event.id)

0 comments on commit a381969

Please sign in to comment.