From 0ccbbd711f55dca4324ff5e110026cfdfa08a324 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adri=C3=A0=20Casaj=C3=BAs?= Date: Fri, 20 Dec 2024 10:02:35 +0100 Subject: [PATCH] Add custom domain admin page (#2348) * Add custom domain admin page * Add domain search --- app/admin_model.py | 133 +++++++++++++++++++--- server.py | 6 + templates/admin/custom_domain_search.html | 118 +++++++++++++++++++ 3 files changed, 244 insertions(+), 13 deletions(-) create mode 100644 templates/admin/custom_domain_search.html diff --git a/app/admin_model.py b/app/admin_model.py index 5d6bf6672..2123e364f 100644 --- a/app/admin_model.py +++ b/app/admin_model.py @@ -8,14 +8,16 @@ from flask_admin.model.template import EndpointLinkRowAction from markupsafe import Markup -from app import models, s3 +from app import models, s3, config from flask import redirect, url_for, request, flash, Response from flask_admin import expose, AdminIndexView from flask_admin.actions import action from flask_admin.contrib import sqla from flask_login import current_user +from app.custom_domain_validation import CustomDomainValidation, DomainValidationResult from app.db import Session +from app.dns_utils import get_network_dns_client from app.events.event_dispatcher import EventDispatcher from app.events.generated.event_pb2 import EventContent, UserPlanChanged from app.models import ( @@ -39,6 +41,7 @@ AliasMailbox, AliasAuditLog, UserAuditLog, + CustomDomain, ) from app.newsletter_utils import send_newsletter_to_user, send_newsletter_to_address from app.user_audit_log_utils import emit_user_audit_log, UserAuditLogAction @@ -773,18 +776,19 @@ class InvalidMailboxDomainAdmin(SLModelView): class EmailSearchResult: - no_match: bool = True - alias: Optional[Alias] = None - alias_audit_log: Optional[List[AliasAuditLog]] = None - mailbox: List[Mailbox] = [] - mailbox_count: int = 0 - deleted_alias: Optional[DeletedAlias] = None - deleted_alias_audit_log: Optional[List[AliasAuditLog]] = None - domain_deleted_alias: Optional[DomainDeletedAlias] = None - domain_deleted_alias_audit_log: Optional[List[AliasAuditLog]] = None - user: Optional[User] = None - user_audit_log: Optional[List[UserAuditLog]] = None - query: str + def __init__(self): + self.no_match: bool = True + self.alias: Optional[Alias] = None + self.alias_audit_log: Optional[List[AliasAuditLog]] = None + self.mailbox: List[Mailbox] = [] + self.mailbox_count: int = 0 + self.deleted_alias: Optional[DeletedAlias] = None + self.deleted_alias_audit_log: Optional[List[AliasAuditLog]] = None + self.domain_deleted_alias: Optional[DomainDeletedAlias] = None + self.domain_deleted_alias_audit_log: Optional[List[AliasAuditLog]] = None + self.user: Optional[User] = None + self.user_audit_log: Optional[List[UserAuditLog]] = None + self.query: str @staticmethod def from_email(email: str) -> EmailSearchResult: @@ -916,3 +920,106 @@ def index(self): data=search, helper=EmailSearchHelpers, ) + + +class CustomDomainWithValidationData: + def __init__(self, domain: CustomDomain): + self.domain: CustomDomain = domain + self.ownership_expected: Optional[str] = None + self.ownership_validation: Optional[DomainValidationResult] = None + self.mx_expected: Optional[str] = None + self.mx_validation: Optional[DomainValidationResult] = None + self.spf_expected: Optional[str] = None + self.spf_validation: Optional[DomainValidationResult] = None + self.dkim_expected: {str: str} = {} + self.dkim_validation: {str: str} = {} + + +class CustomDomainSearchResult: + def __init__(self): + self.no_match: bool = False + self.user: Optional[User] = None + self.domains: list[CustomDomainWithValidationData] = [] + + @staticmethod + def from_user(user: Optional[User]) -> CustomDomainSearchResult: + out = CustomDomainSearchResult() + if user is None: + out.no_match = True + return out + out.user = user + dns_client = get_network_dns_client() + validator = CustomDomainValidation( + dkim_domain=config.EMAIL_DOMAIN, + partner_domains=config.PARTNER_DNS_CUSTOM_DOMAINS, + partner_domains_validation_prefixes=config.PARTNER_CUSTOM_DOMAIN_VALIDATION_PREFIXES, + dns_client=dns_client, + ) + for custom_domain in user.custom_domains: + validation_data = CustomDomainWithValidationData(custom_domain) + if not custom_domain.ownership_verified: + validation_data.ownership_expected = ( + validator.get_ownership_verification_record(custom_domain) + ) + validation_data.ownership_validation = ( + validator.validate_domain_ownership(custom_domain) + ) + if not custom_domain.verified: + validation_data.mx_expected = validator.get_expected_mx_records( + custom_domain + ) + validation_data.mx_validation = validator.validate_mx_records( + custom_domain + ) + if not custom_domain.spf_verified: + validation_data.spf_expected = validator.get_expected_spf_record( + custom_domain + ) + validation_data.spf_validation = validator.validate_spf_records( + custom_domain + ) + if not custom_domain.dkim_verified: + validation_data.dkim_expected = validator.get_dkim_records( + custom_domain + ) + validation_data.dkim_validation = validator.validate_dkim_records( + custom_domain + ) + out.domains.append(validation_data) + print(validation_data.dkim_expected, validation_data.dkim_validation) + + return out + + +class CustomDomainSearchAdmin(BaseView): + def is_accessible(self): + return current_user.is_authenticated and current_user.is_admin + + def inaccessible_callback(self, name, **kwargs): + # redirect to login page if user doesn't have access + flash("You don't have access to the admin page", "error") + return redirect(url_for("dashboard.index", next=request.url)) + + @expose("/", methods=["GET", "POST"]) + def index(self): + query = request.args.get("user") + if query is None: + search = CustomDomainSearchResult() + else: + try: + user_id = int(query) + user = User.get_by(id=user_id) + except ValueError: + user = User.get_by(email=query) + if user is None: + cd = CustomDomain.get_by(domain=query) + if cd is not None: + user = cd.user + search = CustomDomainSearchResult.from_user(user) + print("NEW", search.domains) + + return self.render( + "admin/custom_domain_search.html", + data=search, + query=query, + ) diff --git a/server.py b/server.py index 6018b7b3f..e707b038b 100644 --- a/server.py +++ b/server.py @@ -44,6 +44,7 @@ MetricAdmin, InvalidMailboxDomainAdmin, EmailSearchAdmin, + CustomDomainSearchAdmin, ) from app.api.base import api_bp from app.auth.base import auth_bp @@ -443,6 +444,11 @@ def init_admin(app): admin.init_app(app, index_view=SLAdminIndexView()) admin.add_view(EmailSearchAdmin(name="Email Search", endpoint="email_search")) + admin.add_view( + CustomDomainSearchAdmin( + name="Custom domain search", endpoint="custom_domain_search" + ) + ) admin.add_view(UserAdmin(User, Session)) admin.add_view(AliasAdmin(Alias, Session)) admin.add_view(MailboxAdmin(Mailbox, Session)) diff --git a/templates/admin/custom_domain_search.html b/templates/admin/custom_domain_search.html new file mode 100644 index 000000000..0ab066557 --- /dev/null +++ b/templates/admin/custom_domain_search.html @@ -0,0 +1,118 @@ +{% extends 'admin/master.html' %} + +{% macro show_user(user) -%} +

User {{ user.email }} with ID {{ user.id }}.

+ + + + + + + + + + + + + + + + {% if user.activated %} + + + {% else %} + + {% endif %} + {% if user.disabled %} + + + {% else %} + + {% endif %} + + + + +
User IDEmailVerifiedStatusPaidPremium
{{ user.id }} + {{ user.email }} + ActivatedPendingDisabledEnabled{{ "yes" if user.is_paid() else "No" }}{{ "yes" if user.is_premium() else "No" }}
+{%- endmacro %} + + +{% macro show_verification(title, expected, errors) -%} + {% if not expected %} +

{{ title }} Verified

+ {% else %} +

{{ title }}

+

Expected

+

{{expected}}

+

Current response

+ + {% endif %} +{%- endmacro %} + + +{% macro show_domain(domain_with_data) -%} +

Domain {{ domain_with_data.domain.domain }}

+ {% set domain = domain_with_data.domain %} + + +{%- endmacro %} + + +{% block body %} + +
+
+
+ + +
+ +
+
+ {% if data.no_match and query %} + + {% endif %} + {% if data.user %} +
+

Found User {{ data.user.email }}

+ {{ show_user(data.user) }} +
+ {% endif %} +
+ + {% for domain_with_data in data.domains %} +
+
+ {{ show_domain(domain_with_data) }} +
+
+ {% endfor %} +
+ + +{% endblock %}