From bf99475e6272cb5776cba19b25645fe61e813efb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adri=C3=A0=20Casaj=C3=BAs?= Date: Fri, 28 Jun 2024 18:26:11 +0200 Subject: [PATCH] Add warning to subject when possible phishing is detected (cherry picked from commit 8f714b9fab49354bfcc10dad8e149a8a0aefdc4c) (cherry picked from commit 21490ec1934b74de7d2e38326735329a87cf5dfd) --- app/email_utils.py | 12 +++++++++++- app/handler/dmarc.py | 2 ++ tests/test_email_utils.py | 28 ++++++++++++++++++++++++++++ 3 files changed, 41 insertions(+), 1 deletion(-) diff --git a/app/email_utils.py b/app/email_utils.py index 070d29102..d962ac543 100644 --- a/app/email_utils.py +++ b/app/email_utils.py @@ -925,10 +925,20 @@ def decode_text(text: str, encoding: EmailEncoding = EmailEncoding.NO) -> str: return text -def add_header(msg: Message, text_header, html_header=None) -> Message: +def add_header( + msg: Message, text_header, html_header=None, subject_prefix=None +) -> Message: if not html_header: html_header = text_header.replace("\n", "
") + if subject_prefix is not None: + subject = msg[headers.SUBJECT] + if not subject: + msg.add_header(headers.SUBJECT, subject_prefix) + else: + subject = f"{subject_prefix} {subject}" + msg.replace_header(headers.SUBJECT, subject) + content_type = msg.get_content_type().lower() if content_type == "text/plain": encoding = get_encoding(msg) diff --git a/app/handler/dmarc.py b/app/handler/dmarc.py index 895fa7036..3d25196c8 100644 --- a/app/handler/dmarc.py +++ b/app/handler/dmarc.py @@ -64,6 +64,7 @@ def apply_dmarc_policy_for_forward_phase( msg, warning_plain_text, warning_html, + subject_prefix="[Possible phishing attempt]", ) return changed_msg, None @@ -76,6 +77,7 @@ def apply_dmarc_policy_for_forward_phase( msg, warning_plain_text, warning_html, + subject_prefix="[Possible phishing attempt]", ) return changed_msg, None diff --git a/tests/test_email_utils.py b/tests/test_email_utils.py index 726ab97a6..7e133f1f0 100644 --- a/tests/test_email_utils.py +++ b/tests/test_email_utils.py @@ -9,6 +9,7 @@ from app import config from app.config import MAX_ALERT_24H, ROOT_DIR from app.db import Session +from app.email import headers from app.email_utils import ( get_email_domain_part, can_create_directory_for_address, @@ -354,6 +355,33 @@ def test_is_valid_email(): assert not is_valid_email("emoji👌@gmail.com") +def test_add_subject_prefix(): + msg = email.message_from_string( + """Subject: Potato +Content-Transfer-Encoding: 7bit + +hello +""" + ) + new_msg = add_header(msg, "text header", "html header", subject_prefix="[TEST]") + assert "text header" in new_msg.as_string() + assert "html header" not in new_msg.as_string() + assert new_msg[headers.SUBJECT] == "[TEST] Potato" + + +def test_add_subject_prefix_with_no_header(): + msg = email.message_from_string( + """Content-Transfer-Encoding: 7bit + +hello +""" + ) + new_msg = add_header(msg, "text header", "html header", subject_prefix="[TEST]") + assert "text header" in new_msg.as_string() + assert "html header" not in new_msg.as_string() + assert new_msg[headers.SUBJECT] == "[TEST]" + + def test_add_header_plain_text(): msg = email.message_from_string( """Content-Type: text/plain; charset=us-ascii