Skip to content

Commit

Permalink
✨ [#42] Implement logout utility function
Browse files Browse the repository at this point in the history
This definitely works with keycloak, it should probably work
with other OpenID providers but those are yet untested.
  • Loading branch information
sergei-maertens committed May 28, 2024
1 parent 967d9ad commit 1fa968e
Show file tree
Hide file tree
Showing 7 changed files with 1,181 additions and 2 deletions.
1 change: 1 addition & 0 deletions mozilla_django_oidc_db/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"oidc_op_token_endpoint": "token_endpoint",
"oidc_op_user_endpoint": "userinfo_endpoint",
"oidc_op_jwks_endpoint": "jwks_uri",
"oidc_op_logout_endpoint": "end_session_endpoint",
}

OPEN_ID_CONFIG_PATH = ".well-known/openid-configuration"
27 changes: 27 additions & 0 deletions mozilla_django_oidc_db/utils.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
import logging
from collections.abc import Collection
from copy import deepcopy

import requests
from glom import Path, PathAccessError, assign, glom
from requests.utils import _parse_content_type_header # type: ignore

from .models import OpenIDConnectConfigBase
from .typing import ClaimPath, JSONObject, JSONValue

logger = logging.getLogger(__name__)


def obfuscate_claim_value(value: JSONValue) -> JSONValue:
"""
Expand Down Expand Up @@ -51,3 +56,25 @@ def extract_content_type(ct_header: str) -> str:
content_type, _ = _parse_content_type_header(ct_header)
# discard the params, we only want the content type itself
return content_type


def do_op_logout(config: OpenIDConnectConfigBase, id_token: str) -> None:
"""
Perform the logout with the OpenID Provider.
Standard: https://openid.net/specs/openid-connect-rpinitiated-1_0.html#RPLogout
"""
logout_endpoint = config.oidc_op_logout_endpoint
if not logout_endpoint:
return

response = requests.post(logout_endpoint, data={"id_token_hint": id_token})
if not response.ok:
logger.warning(
"Failed to log out the user at the OpenID Provider. Status code: %s",
response.status_code,
extra={
"response": response,
"status_code": response.status_code,
},
)

Large diffs are not rendered by default.

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions tests/test_admin_form.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def test_derive_endpoints_success():
"token_endpoint": "http://provider.com/auth/realms/master/protocol/openid-connect/token",
"userinfo_endpoint": "http://provider.com/auth/realms/master/protocol/openid-connect/userinfo",
"jwks_uri": "http://provider.com/auth/realms/master/protocol/openid-connect/certs",
"end_session_endpoint": "http://provider.com/auth/realms/master/protocol/openid-connect/logout",
}
with requests_mock.Mocker() as m:
m.get(
Expand Down
88 changes: 88 additions & 0 deletions tests/test_logout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from django.test import Client
from django.urls import reverse

import pytest
from requests import Session

from mozilla_django_oidc_db.models import OpenIDConnectConfig
from mozilla_django_oidc_db.utils import do_op_logout

from .utils import keycloak_login


@pytest.fixture
def kc_session(
settings,
keycloak_config,
mock_state_and_nonce,
client,
django_user_model,
vcr,
):
settings.OIDC_STORE_ID_TOKEN = True
session = Session()

login_url = reverse("login")
django_login_response = client.get(login_url)
assert django_login_response.status_code == 302

# simulate login to Keycloak
redirect_uri = keycloak_login(django_login_response["Location"], session=session)

# complete the login flow on our end
callback_response = client.get(redirect_uri)

assert callback_response.status_code == 302
assert callback_response["Location"] == "/admin/"

# a user was created
assert django_user_model.objects.count() == 1

# assert that we are logged in to keycloak
django_login_response2 = client.get(login_url)

kc_response = session.get(django_login_response2["Location"], allow_redirects=False)
assert kc_response.status_code == 302
assert kc_response.headers["Location"].startswith("http://testserver")

yield (client, session)

session.close()


@pytest.mark.vcr
@pytest.mark.oidcconfig(oidc_op_logout_endpoint="")
def test_logout_without_endpoint_configured(
keycloak_config: OpenIDConnectConfig,
kc_session: tuple[Client, Session],
):
client, session = kc_session

do_op_logout(keycloak_config, id_token=client.session["oidc_id_token"])

# check that we are still authenticated in keycloak
login_url = reverse("login")
django_login_response = client.get(login_url)
kc_response = session.get(django_login_response["Location"], allow_redirects=False)

assert kc_response.status_code == 302
assert kc_response.headers["Location"].startswith("http://testserver")


@pytest.mark.vcr
def test_logout_with_logout_endpoint_configured(
keycloak_config: OpenIDConnectConfig,
kc_session: tuple[Client, Session],
):
assert keycloak_config.oidc_op_logout_endpoint
client, session = kc_session

do_op_logout(keycloak_config, id_token=client.session["oidc_id_token"])

# check that we are still authenticated in keycloak
login_url = reverse("login")
django_login_response = client.get(login_url)
kc_response = session.get(django_login_response["Location"], allow_redirects=False)

assert kc_response.status_code == 200, "Did not end up on Keycloak's login page"
assert kc_response.headers["Content-Type"].startswith("text/html")
7 changes: 5 additions & 2 deletions tests/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from contextlib import nullcontext

from pyquery import PyQuery as pq
from requests import Session

Expand All @@ -6,6 +8,7 @@ def keycloak_login(
login_url: str,
username: str = "testuser",
password: str = "testuser",
session: Session | None = None,
) -> str:
"""
Test helper to perform a keycloak login.
Expand All @@ -15,8 +18,8 @@ def keycloak_login(
:returns: The redirect URI to consume in the django application, with the ``code``
``state`` query parameters. Consume this with ``response = client.get(url)``.
"""

with Session() as session:
cm = Session() if session is None else nullcontext(session)
with cm as session:
login_page = session.get(login_url)
assert login_page.status_code == 200

Expand Down

0 comments on commit 1fa968e

Please sign in to comment.