diff --git a/app/config.py b/app/config.py index 97b1e0a47..9ec2cb655 100644 --- a/app/config.py +++ b/app/config.py @@ -309,6 +309,7 @@ def get_env_dict(env_var: str) -> dict[str, str]: JOB_SEND_USER_REPORT = "send-user-report" JOB_SEND_PROTON_WELCOME_1 = "proton-welcome-1" JOB_SEND_ALIAS_CREATION_EVENTS = "send-alias-creation-events" +JOB_SEND_EVENT_TO_WEBHOOK = "send-event-to-webhook" # for pagination PAGE_LIMIT = 20 diff --git a/app/jobs/send_event_job.py b/app/jobs/send_event_job.py new file mode 100644 index 000000000..3f5ca6169 --- /dev/null +++ b/app/jobs/send_event_job.py @@ -0,0 +1,70 @@ +from __future__ import annotations + +import base64 +from typing import Optional + +import arrow + +from app import config +from app.errors import ProtonPartnerNotSetUp +from app.events.generated import event_pb2 +from app.events.generated.event_pb2 import EventContent +from app.models import ( + User, + Job, + PartnerUser, +) +from app.proton.utils import get_proton_partner +from events.event_sink import EventSink + + +class SendEventToWebhookJob: + def __init__(self, user: User, event: EventContent): + self._user: User = user + self._event: EventContent = event + + def run(self, sink: EventSink) -> bool: + # Check if the current user has a partner_id + try: + proton_partner_id = get_proton_partner().id + except ProtonPartnerNotSetUp: + return False + + # It has. Retrieve the information for the PartnerUser + partner_user = PartnerUser.get_by( + user_id=self._user.id, partner_id=proton_partner_id + ) + if partner_user is None: + return True + event = event_pb2.Event( + user_id=self._user.id, + external_user_id=partner_user.external_user_id, + partner_id=partner_user.partner_id, + content=self._event, + ) + + serialized = event.SerializeToString() + return sink.send_data_to_webhook(serialized) + + @staticmethod + def create_from_job(job: Job) -> Optional[SendEventToWebhookJob]: + user = User.get(job.payload["user_id"]) + if not user: + return None + event_data = base64.b64decode(job.payload["event"]) + event = event_pb2.EventContent() + event.ParseFromString(event_data) + + return SendEventToWebhookJob(user=user, event=event) + + def store_job_in_db(self, run_at: Optional[arrow.Arrow]) -> Job: + stub = self._event.SerializeToString() + return Job.create( + name=config.JOB_SEND_EVENT_TO_WEBHOOK, + payload={ + "user_id": self._user.id, + "event": base64.b64encode(stub).decode("utf-8"), + }, + run_at=run_at if run_at is not None else arrow.now(), + commit=True, + ) diff --git a/events/event_sink.py b/events/event_sink.py index e61e2e836..115f68552 100644 --- a/events/event_sink.py +++ b/events/event_sink.py @@ -12,6 +12,10 @@ class EventSink(ABC): def process(self, event: SyncEvent) -> bool: pass + @abstractmethod + def send_data_to_webhook(self, data: bytes) -> bool: + pass + class HttpEventSink(EventSink): def process(self, event: SyncEvent) -> bool: @@ -21,9 +25,16 @@ def process(self, event: SyncEvent) -> bool: LOG.info(f"Sending event {event.id} to {EVENT_WEBHOOK}") + if self.send_data_to_webhook(event.content): + LOG.info(f"Event {event.id} sent successfully to webhook") + return True + + return False + + def send_data_to_webhook(self, data: bytes) -> bool: res = requests.post( url=EVENT_WEBHOOK, - data=event.content, + data=data, headers={"Content-Type": "application/x-protobuf"}, verify=not EVENT_WEBHOOK_SKIP_VERIFY_SSL, ) @@ -36,7 +47,6 @@ def process(self, event: SyncEvent) -> bool: ) return False else: - LOG.info(f"Event {event.id} sent successfully to webhook") return True @@ -44,3 +54,7 @@ class ConsoleEventSink(EventSink): def process(self, event: SyncEvent) -> bool: LOG.info(f"Handling event {event.id}") return True + + def send_data_to_webhook(self, data: bytes) -> bool: + LOG.info(f"Sending {len(data)} bytes to webhook") + return True diff --git a/job_runner.py b/job_runner.py index e2aefc130..89d6f2de2 100644 --- a/job_runner.py +++ b/job_runner.py @@ -18,6 +18,7 @@ from app.import_utils import handle_batch_import from app.jobs.event_jobs import send_alias_creation_events_for_user from app.jobs.export_user_data_job import ExportUserDataJob +from app.jobs.send_event_job import SendEventToWebhookJob from app.log import LOG from app.models import User, Job, BatchImport, Mailbox, CustomDomain, JobState from app.user_audit_log_utils import emit_user_audit_log, UserAuditLogAction @@ -300,6 +301,10 @@ def process_job(job: Job): send_alias_creation_events_for_user( user, dispatcher=PostgresDispatcher.get() ) + elif job.name == config.JOB_SEND_EVENT_TO_WEBHOOK: + send_job = SendEventToWebhookJob.create_from_job(job) + if send_job: + send_job.run() else: LOG.e("Unknown job name %s", job.name) diff --git a/tests/jobs/test_send_event_to_webhook.py b/tests/jobs/test_send_event_to_webhook.py new file mode 100644 index 000000000..dacc48058 --- /dev/null +++ b/tests/jobs/test_send_event_to_webhook.py @@ -0,0 +1,40 @@ +import arrow + +from app import config +from app.events.generated.event_pb2 import EventContent, AliasDeleted +from app.jobs.send_event_job import SendEventToWebhookJob +from app.models import PartnerUser +from app.proton.utils import get_proton_partner +from events.event_sink import ConsoleEventSink +from tests.utils import create_new_user, random_token + + +def test_serialize_and_deserialize_job(): + user = create_new_user() + alias_id = 34 + alias_email = "a@b.c" + event = EventContent(alias_deleted=AliasDeleted(id=alias_id, email=alias_email)) + run_at = arrow.now().shift(hours=10) + db_job = SendEventToWebhookJob(user, event).store_job_in_db(run_at=run_at) + assert db_job.run_at == run_at + assert db_job.name == config.JOB_SEND_EVENT_TO_WEBHOOK + job = SendEventToWebhookJob.create_from_job(db_job) + assert job._user.id == user.id + assert job._event.alias_deleted.id == alias_id + assert job._event.alias_deleted.email == alias_email + + +def test_send_event_to_webhook(): + user = create_new_user() + PartnerUser.create( + user_id=user.id, + partner_id=get_proton_partner().id, + external_user_id=random_token(10), + flush=True, + ) + alias_id = 34 + alias_email = "a@b.c" + event = EventContent(alias_deleted=AliasDeleted(id=alias_id, email=alias_email)) + job = SendEventToWebhookJob(user, event) + sink = ConsoleEventSink() + assert job.run(sink)