diff --git a/apps/base/__init__.py b/apps/base/__init__.py index 9e5602404..e3854cd7a 100644 --- a/apps/base/__init__.py +++ b/apps/base/__init__.py @@ -207,4 +207,5 @@ def deliveries(): from . import tasks_banking # noqa from . import tasks_export # noqa from . import tasks_videos # noqa +from ..notifications import tasks # noqa from . import dev # noqa diff --git a/apps/notifications/jobs.py b/apps/notifications/jobs.py deleted file mode 100644 index 86c15d8af..000000000 --- a/apps/notifications/jobs.py +++ /dev/null @@ -1,88 +0,0 @@ -from sqlalchemy import and_ -from main import db -from datetime import datetime, timedelta -from flask import current_app as app - -from models import scheduled_task -from models.cfp import Proposal -from models.user import User -from models.web_push import PushNotificationJob -from models.notifications import UserNotificationPreference -from pywebpush import webpush, WebPushException - - -def deliver_notification(job: PushNotificationJob): - """Deliver a push notification from a PushNotificationJob. - - The passed job will be mutated to reflect delivery state. A job which isn't - queued will be skipped over. - """ - if job.state != "queued": - return - - try: - webpush( - subscription_info=job.target.subscription_info, - data=job.title, - vapid_private_key=app.config["WEBPUSH_PRIVATE_KEY"], - vapid_claims={ - "sub": "mailto:contact@emfcamp.org", - }, - ) - - job.state = "delivered" - except WebPushException as err: - job.state = "failed" - job.error = err.message - - -@scheduled_task(minutes=1) -def send_queued_notifications(): - jobs = PushNotificationJob.query.where( - PushNotificationJob.state == "queued" - and (PushNotificationJob.not_before is None or PushNotificationJob.not_before <= datetime.now()) - ).all() - - for job in jobs: - deliver_notification(job) - db.session.add(job) - - db.session.commit() - - -@scheduled_task(minutes=15) -def queue_content_notifications(time=None) -> None: - if time is None: - time = datetime.now() - - users = User.query.join( - UserNotificationPreference, - User.notification_preferences.and_(UserNotificationPreference.favourited_content), - ) - - upcoming_content = Proposal.query.filter( - and_(Proposal.scheduled_time >= time, Proposal.scheduled_time <= time + timedelta(minutes=16)) - ).all() - - for user in users: - user_favourites = [f.id for f in user.favourites] - favourites = [p for p in upcoming_content if p.id in user_favourites] - for proposal in favourites: - for target in user.web_push_targets: - related_to = f"favourite,user:{user.id},proposal:{proposal.id},target:{target.id}" - if ( - PushNotificationJob.query.where( - PushNotificationJob.related_to == related_to - ).one_or_none() - is None - ): - job = PushNotificationJob( - target=target, - title=f"{proposal.title} is happening soon at {proposal.scheduled_venue.name}", - related_to=related_to, - not_before=proposal.scheduled_time - timedelta(minutes=15), - ) - print(f"Queued notification for {job.related_to}") - db.session.add(job) - - db.session.commit() diff --git a/apps/notifications/tasks.py b/apps/notifications/tasks.py index e69de29bb..23c38b2e3 100644 --- a/apps/notifications/tasks.py +++ b/apps/notifications/tasks.py @@ -0,0 +1,103 @@ +from sqlalchemy import and_ +from main import db +from datetime import datetime, timedelta +from flask import current_app as app + +from models import scheduled_task +from models.cfp import Proposal +from models.user import User +from models.volunteer.shift import Shift, ShiftEntry +from models.web_push import PushNotificationJob, enqueue_if_not_exists +from models.notifications import UserNotificationPreference +from pywebpush import webpush, WebPushException + + +def deliver_notification(job: PushNotificationJob): + """Deliver a push notification from a PushNotificationJob. + + The passed job will be mutated to reflect delivery state. A job which isn't + queued will be skipped over. + """ + if job.state != "queued": + return + + try: + webpush( + subscription_info=job.target.subscription_info, + data=job.title, + vapid_private_key=app.config["WEBPUSH_PRIVATE_KEY"], + vapid_claims={ + "sub": "mailto:contact@emfcamp.org", + }, + ) + + job.state = "delivered" + except WebPushException as err: + job.state = "failed" + job.error = err.message + + +@scheduled_task(minutes=1) +def send_queued_notifications(): + jobs = PushNotificationJob.query.where( + PushNotificationJob.state == "queued" + and (PushNotificationJob.not_before is None or PushNotificationJob.not_before <= datetime.now()) + ).all() + + for job in jobs: + deliver_notification(job) + db.session.add(job) + + db.session.commit() + + +@scheduled_task(minutes=15) +def queue_content_notifications(time=None) -> None: + if time is None: + time = datetime.now() + + users = User.query.join( + UserNotificationPreference, + User.notification_preferences.and_(UserNotificationPreference.favourited_content), + ) + + upcoming_content = Proposal.query.filter( + and_(Proposal.scheduled_time >= time, Proposal.scheduled_time <= time + timedelta(minutes=16)) + ).all() + + for user in users: + user_favourites = [f.id for f in user.favourites] + favourites = [p for p in upcoming_content if p.id in user_favourites] + for proposal in favourites: + for target in user.web_push_targets: + enqueue_if_not_exists( + target=target, + related_to=f"favourite,user:{user.id},proposal:{proposal.id},target:{target.id}", + title=f"{proposal.title} is happening soon at {proposal.scheduled_venue.name}", + not_before=proposal.scheduled_time - timedelta(minutes=15), + ) + + db.session.commit() + + +@scheduled_task(minutes=15) +def queue_shift_notifications(time=None) -> None: + if time is None: + time = datetime.now() + + upcoming_shifts: list[Shift] = Shift.query.filter( + and_(Shift.start >= time, Shift.start <= time + timedelta(minutes=16)) + ).all() + + for shift in upcoming_shifts: + for user in shift.volunteers: + if user.notification_preferences.volunteer_shifts: + for target in user.web_push_targets: + enqueue_if_not_exists( + target=target, + related_to=f"shift_reminder,user:{user.id},shift:{shift.id},target:{target.id}", + title=f"Your {shift.role.name} shift is about to start, please go to {shift.venue.name}.", + not_before=shift.start - timedelta(minutes=15), + ) + + db.session.commit() diff --git a/models/web_push.py b/models/web_push.py index c71648485..96e64764c 100644 --- a/models/web_push.py +++ b/models/web_push.py @@ -22,6 +22,31 @@ def notify(target, message): ) +def enqueue_if_not_exists( + target: "WebPushTarget", + title: str, + body: str | None = None, + related_to: str | None = None, + not_before: datetime | None = None, +) -> "PushNotificationJob": + if related_to is not None: + existing_job = PushNotificationJob.query.where( + PushNotificationJob.related_to == related_to + ).one_or_none() + if existing_job is not None: + return existing_job + + job = PushNotificationJob( + target=target, + title=title, + body=body, + related_to=related_to, + not_before=not_before, + ) + db.session.add(job) + return job + + class WebPushTarget(BaseModel): __table_name__ = "web_push_target" id = db.Column(db.Integer, primary_key=True)