Skip to content

Commit

Permalink
Implemented group management and make sure test coverage is complete.
Browse files Browse the repository at this point in the history
  • Loading branch information
Matt Magin committed Apr 14, 2020
1 parent 8a59360 commit 20aad69
Show file tree
Hide file tree
Showing 14 changed files with 818 additions and 133 deletions.
3 changes: 3 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[run]
omit = okta_oauth2/tests/*
source = okta_oauth2
1 change: 1 addition & 0 deletions okta_oauth2/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def __init__(self):
try:
# Configuration object
self.org_url = settings.OKTA_AUTH["ORG_URL"]
self.superuser_group = settings.OKTA_AUTH.get("SUPERUSER_GROUP", None)

# OpenID Specific
self.client_id = settings.OKTA_AUTH["CLIENT_ID"]
Expand Down
8 changes: 4 additions & 4 deletions okta_oauth2/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from .exceptions import InvalidToken, TokenExpired
from .tokens import TokenValidator

config = Config()
logger = logging.getLogger(__name__)


Expand All @@ -17,6 +16,7 @@ class OktaMiddleware:
"""

def __init__(self, get_response):
self.config = Config()
self.get_response = get_response

def __call__(self, request):
Expand All @@ -41,14 +41,14 @@ def __call__(self, request):
try:
try:
validator = TokenValidator(
config, request.COOKIES["okta-oauth-nonce"], request
self.config, request.COOKIES["okta-oauth-nonce"], request
)
validator.validate_token(request.session["tokens"]["id_token"])
except TokenExpired:
logger.debug("Token has expired.")
if "refresh_token" in request.session["tokens"]:
logger.debug("Refresh token available... Refreshing.")
validator = TokenValidator(config, None, request)
validator = TokenValidator(self.config, None, request)
validator.tokens_from_refresh_token(
request.session["tokens"]["refresh_token"]
)
Expand All @@ -63,4 +63,4 @@ def __call__(self, request):
return response

def is_public_url(self, url):
return any(public_url.match(url) for public_url in config.public_urls)
return any(public_url.match(url) for public_url in self.config.public_urls)
22 changes: 22 additions & 0 deletions okta_oauth2/tests/settings.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import os

SECRET_KEY = "imasecretlol"

DATABASES = {"default": {"NAME": "test.db", "ENGINE": "django.db.backends.sqlite3"}}
Expand All @@ -20,3 +22,23 @@
}

ROOT_URLCONF = "okta_oauth2.tests.urls"

AUTHENTICATION_BACKENDS = ("okta_oauth2.backend.OktaBackend",)

TEMPLATES = [
{
"BACKEND": "django.template.backends.django.DjangoTemplates",
"APP_DIRS": True,
"DIRS": [os.path.join(os.path.dirname(__file__), "templates"),],
"OPTIONS": {
"context_processors": [
# Django builtin
"django.template.context_processors.debug",
"django.template.context_processors.media",
"django.template.context_processors.request",
"django.contrib.auth.context_processors.auth",
"django.contrib.messages.context_processors.messages",
]
},
},
]
Empty file.
29 changes: 29 additions & 0 deletions okta_oauth2/tests/test_backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from unittest.mock import Mock, patch

from okta_oauth2.backend import OktaBackend


def test_backend_authenticate_requires_code_and_nonce(rf):
"""
the authenticate method on the custom backend requires both
an auth code and a nonce. If either aren't provided then
authenitcate should return None
"""
backend = OktaBackend()
assert backend.authenticate(rf) is None


def test_authenticate_returns_a_user(rf, django_user_model):
"""
We can't do the real authentication but we do need to make sure a
real user is returned from the backend authenticate method if the
TokenValidator succeeds, so fake success and see what happens.
"""
user = django_user_model.objects.create_user("testuser", "testuser@example.com")

with patch(
"okta_oauth2.backend.TokenValidator.tokens_from_auth_code",
Mock(return_value=(user, None)),
):
backend = OktaBackend()
assert backend.authenticate(rf, auth_code="123456", nonce="imanonce") == user
49 changes: 49 additions & 0 deletions okta_oauth2/tests/test_conf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import re

import pytest
from django.core.exceptions import ImproperlyConfigured
from okta_oauth2.conf import Config
from okta_oauth2.tests.utils import update_okta_settings


def test_conf_raises_error_if_no_settings(settings):
"""
if there's no OKTA_AUTH in settings then we should
be raising an ImproperlyConfigured exception.
"""
del settings.OKTA_AUTH
with pytest.raises(ImproperlyConfigured):
Config()


def test_public_named_urls_are_built(settings):
"""
We should have reversed url regexes to match against
in our config objects.
"""
settings.OKTA_AUTH = update_okta_settings(
settings.OKTA_AUTH, "PUBLIC_NAMED_URLS", ("named-url",)
)
config = Config()
assert config.public_urls == [
re.compile("^/named/$"),
re.compile("^/accounts/login/$"),
re.compile("^/accounts/logout/$"),
re.compile("^/accounts/oauth2/callback/$"),
]


def test_invalid_public_named_urls_are_ignored(settings):
"""
We don't want to crash if our public named urls don't
exist, instead just skip it.
"""
settings.OKTA_AUTH = update_okta_settings(
settings.OKTA_AUTH, "PUBLIC_NAMED_URLS", ("not-a-valid-url",)
)
config = Config()
assert config.public_urls == [
re.compile("^/accounts/login/$"),
re.compile("^/accounts/logout/$"),
re.compile("^/accounts/oauth2/callback/$"),
]
71 changes: 59 additions & 12 deletions okta_oauth2/tests/test_middleware.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
from unittest.mock import Mock, patch

from django.http import HttpResponse
from django.test import RequestFactory
from django.urls import reverse
from okta_oauth2.exceptions import TokenExpired
from okta_oauth2.middleware import OktaMiddleware
from okta_oauth2.tests.utils import build_token
from okta_oauth2.tests.utils import build_id_token, update_okta_settings

rf = RequestFactory()


def test_no_token_redirects_to_login():
def test_no_token_redirects_to_login(rf):
"""
If there's no token in the session then we should be
redirecting to the login.
Expand All @@ -23,7 +20,7 @@ def test_no_token_redirects_to_login():
assert response.url == reverse("okta_oauth2:login")


def test_invalid_token_redirects_to_login():
def test_invalid_token_redirects_to_login(rf):
"""
It there's a token but it's invalid we should be
redirecting to the login.
Expand All @@ -37,15 +34,15 @@ def test_invalid_token_redirects_to_login():
assert response.url == reverse("okta_oauth2:login")


def test_valid_token_returns_response():
def test_valid_token_returns_response(rf):
"""
If we have a valid token we should be returning the normal
response from the middleware.
"""

nonce = "123456"
# We're building a token here that we know will be valid
token = build_token(nonce=nonce)
token = build_id_token(nonce=nonce)

with patch(
"okta_oauth2.middleware.TokenValidator._jwks", Mock(return_value="secret")
Expand All @@ -58,17 +55,17 @@ def test_valid_token_returns_response():
assert response.status_code == 200


def test_token_expired_triggers_refresh():
def test_token_expired_triggers_refresh(rf):
"""
Test that an expired token triggers
an attempt at refreshing the token.
"""
raises_token_expired = Mock()
raises_token_expired.side_effect = TokenExpired

with patch("okta_oauth2.middleware.TokenValidator.validate_token"), patch(
"okta_oauth2.middleware.TokenValidator.tokens_from_refresh_token"
):
with patch(
"okta_oauth2.middleware.TokenValidator.validate_token", raises_token_expired
), patch("okta_oauth2.middleware.TokenValidator.tokens_from_refresh_token"):

request = rf.get("/")
request.COOKIES["okta-oauth-nonce"] = "123456"
Expand All @@ -81,3 +78,53 @@ def test_token_expired_triggers_refresh():
mw = OktaMiddleware(Mock(return_value=HttpResponse()))
response = mw(request)
assert response.status_code == 200


def test_token_expired_triggers_refresh_with_no_refresh(rf):
"""
Test that an expired token triggers
an attempt at refreshing the token. In this situation we
don't have a refresh token so we should be redirecting back
to login.
"""
raises_token_expired = Mock()
raises_token_expired.side_effect = TokenExpired

with patch(
"okta_oauth2.middleware.TokenValidator.validate_token", raises_token_expired
), patch("okta_oauth2.middleware.TokenValidator.tokens_from_refresh_token"):

request = rf.get("/")
request.COOKIES["okta-oauth-nonce"] = "123456"
request.session = {"tokens": {"id_token": "imanexpiredtoken"}}
mw = OktaMiddleware(Mock(return_value=HttpResponse()))
response = mw(request)
assert response.status_code == 302
assert response.url == reverse("okta_oauth2:login")


def test_middleware_allows_public_url(settings, rf):
"""
A URL that has been defined as a public url
should just pass through our middleware.
"""
settings.OKTA_AUTH = update_okta_settings(
settings.OKTA_AUTH, "PUBLIC_NAMED_URLS", ("named-url",)
)
request = rf.get("/named/")
request.session = {}
mw = OktaMiddleware(Mock(return_value=HttpResponse()))
response = mw(request)
assert response.status_code == 200


def test_unauthorized_post_returns_401(settings, rf):
"""
redirecting a POST is bad form so just return
a 401 Unauthorized response if no token is there.
"""
request = rf.post("/named/")
request.session = {}
mw = OktaMiddleware(Mock(return_value=HttpResponse()))
response = mw(request)
assert response.status_code == 401
Loading

0 comments on commit 20aad69

Please sign in to comment.