Skip to content

Commit

Permalink
add group management
Browse files Browse the repository at this point in the history
  • Loading branch information
Matt Magin committed Apr 15, 2020
1 parent 142aefd commit b4ef5aa
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 1 deletion.
3 changes: 3 additions & 0 deletions okta_oauth2/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ def __init__(self):
try:
# Configuration object
self.org_url = settings.OKTA_AUTH["ORG_URL"]
# Make users in this okta group superusers
self.superuser_group = settings.OKTA_AUTH.get("SUPERUSER_GROUP", None)
# Allow django-okta-auth to add groups
self.manage_groups = settings.OKTA_AUTH.get("MANAGE_GROUPS", False)

# OpenID Specific
self.client_id = settings.OKTA_AUTH["CLIENT_ID"]
Expand Down
63 changes: 63 additions & 0 deletions okta_oauth2/tests/test_token_validator.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from unittest.mock import MagicMock, Mock, patch

import pytest
from django.contrib.auth.models import Group
from django.contrib.sessions.middleware import SessionMiddleware
from django.core.cache import caches
from django.utils.timezone import now
Expand Down Expand Up @@ -68,6 +69,14 @@ def get_superuser_token_result(self, code):
}


def get_normal_user_with_groups_token(self, code):
return {
"access_token": build_access_token(),
"id_token": build_id_token(groups=["one", "two"]),
"refresh_token": "refresh",
}


def add_session(req):
mw = SessionMiddleware()
mw.process_request(req)
Expand Down Expand Up @@ -368,3 +377,57 @@ def test_unmatching_nonce_raises_error(rf):
), pytest.raises(NonceDoesNotMatch):
tv = TokenValidator(c, "defaultnonce", rf.get("/"))
tv.validate_token(token)


@pytest.mark.django_db
def test_groups_are_created_and_user_added(rf, settings, django_user_model):
"""
If MANAGE_GROUPS is true the groups should be created and the user
should be added to them.
"""
settings.OKTA_AUTH = update_okta_settings(settings.OKTA_AUTH, "MANAGE_GROUPS", True)

c = Config()
req = rf.get("/")
add_session(req)

with patch(
"okta_oauth2.tokens.TokenValidator.call_token_endpoint",
get_normal_user_with_groups_token,
), patch("okta_oauth2.tokens.TokenValidator._jwks", Mock(return_value="secret")):
tv = TokenValidator(c, "defaultnonce", req)
user, tokens = tv.tokens_from_refresh_token("refresh")

groups = Group.objects.all()
assert [("one",), ("two",)] == list(groups.values_list("name"))
assert list(user.groups.all()) == list(Group.objects.all())


@pytest.mark.django_db
def test_user_is_removed_from_groups(rf, settings, django_user_model):
"""
When MANAGE_GROUPS is true a user should be removed from a
group if it's not included in the token response.
"""
settings.OKTA_AUTH = update_okta_settings(settings.OKTA_AUTH, "MANAGE_GROUPS", True)

user = django_user_model._default_manager.create_user(
username="fakemail@notreal.com", email="fakemail@notreal.com"
)
group = Group.objects.create(name="test-group")

user.groups.add(group)

c = Config()
req = rf.get("/")
add_session(req)

with patch(
"okta_oauth2.tokens.TokenValidator.call_token_endpoint",
get_normal_user_with_groups_token,
), patch("okta_oauth2.tokens.TokenValidator._jwks", Mock(return_value="secret")):
tv = TokenValidator(c, "defaultnonce", req)
user, tokens = tv.tokens_from_refresh_token("refresh")

groups = user.groups.all()
assert [("one",), ("two",)] == list(groups.values_list("name"))
16 changes: 16 additions & 0 deletions okta_oauth2/tokens.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import jwt as jwt_python
import requests
from django.contrib.auth import get_user_model
from django.contrib.auth.models import Group
from django.core.cache import caches
from django.db.models import Q
from jose import jws, jwt
from jose.exceptions import JWSError, JWTError

Expand Down Expand Up @@ -59,6 +61,17 @@ def tokens_from_refresh_token(self, refresh_token):
result = self.call_token_endpoint(data)
return self.handle_token_result(result)

def manage_groups(self, user, groups):
for group in groups:
group = Group(name=group)
group.save()
user.groups.add(group)

removed_groups = user.groups.filter(~Q(name__in=groups))

for group in removed_groups:
user.groups.remove(group)

def handle_token_result(self, token_result):
tokens = {}

Expand Down Expand Up @@ -90,6 +103,9 @@ def handle_token_result(self, token_result):
username=claims["email"], email=claims["email"]
)

if self.config.manage_groups:
self.manage_groups(user, claims["groups"])

if "access_token" in token_result:
tokens["access_token"] = token_result["access_token"]

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "django-okta-auth"
version = "0.4.0"
version = "0.5.0"
description = "Django Authentication for Okta OpenID"
authors = ["Matt Magin <matt.magin@cmv.com.au>"]
license = "MIT"
Expand Down
1 change: 1 addition & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
[pytest]
addopts = -vs --tb=short
DJANGO_SETTINGS_MODULE = okta_oauth2.tests.settings
junit_family=xunit2

0 comments on commit b4ef5aa

Please sign in to comment.