Skip to content

Commit

Permalink
Add JWT Token authenticator
Browse files Browse the repository at this point in the history
  • Loading branch information
stveit committed Nov 18, 2022
1 parent ee83f66 commit 1eff99c
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 1 deletion.
61 changes: 60 additions & 1 deletion src/argus/auth/authentication.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
from datetime import timedelta
from urllib.request import urlopen
import json
import jwt

from django.conf import settings
from django.utils import timezone
from rest_framework.authentication import TokenAuthentication
from rest_framework.authentication import TokenAuthentication, BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed

from .models import User


class ExpiringTokenAuthentication(TokenAuthentication):
EXPIRATION_DURATION = timedelta(days=settings.AUTH_TOKEN_EXPIRES_AFTER_DAYS)
Expand All @@ -17,3 +22,57 @@ def authenticate_credentials(self, key):
raise AuthenticationFailed("Token has expired.")

return user, token


class JWTAuthentication(BaseAuthentication):
def authenticate(self, request):
try:
raw_token = self.get_raw_jwt_token(request)
except ValueError:
return None
try:
validated_token = jwt.decode(
jwt=raw_token,
algorithms=["RS256", "RS384", "RS512"],
key=self.get_public_key(),
options={
"require": [
"exp",
"nbf",
"aud",
"iss",
"sub",
]
},
audience=settings.JWT_AUDIENCE,
issuer=settings.JWT_ISSUER,
)
except jwt.exceptions.PyJWTError as e:
raise AuthenticationFailed(f"Error validating JWT token: {e}")
username = validated_token["sub"]
try:
user = User.objects.get(username=username)
except User.DoesNotExist:
raise AuthenticationFailed(f"No user found for username {username}")

return user, validated_token

def get_public_key(self):
response = urlopen(settings.JWK_ENDPOINT)
jwks = json.loads(response.read())
jwk = jwks["keys"][0]
public_key = jwt.algorithms.RSAAlgorithm.from_jwk(json.dumps(jwk))
return public_key

def get_raw_jwt_token(self, request):
"""Raises ValueError if a jwt token could not be found"""
auth_header = request.META.get("HTTP_AUTHORIZATION")
if not auth_header:
raise ValueError("No Authorization header found")
try:
scheme, token = auth_header.split()
except ValueError as e:
raise ValueError(f"Failed to parse Authorization header: {e}")
if scheme != settings.JWT_AUTH_SCHEME:
raise ValueError(f"Invalid Authorization scheme: {scheme}")
return token
6 changes: 6 additions & 0 deletions src/argus/site/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@
"argus.auth.authentication.ExpiringTokenAuthentication",
# For BrowsableAPIRenderer
"rest_framework.authentication.SessionAuthentication",
"argus.auth.authentication.JWTAuthentication",
),
"DEFAULT_PERMISSION_CLASSES": ("rest_framework.permissions.IsAuthenticated",),
"DEFAULT_RENDERER_CLASSES": (
Expand Down Expand Up @@ -301,3 +302,8 @@
#
# SOCIAL_AUTH_DATAPORTEN_FEIDE_KEY = SOCIAL_AUTH_DATAPORTEN_KEY
# SOCIAL_AUTH_DATAPORTEN_FEIDE_SECRET = SOCIAL_AUTH_DATAPORTEN_SECRET

JWK_ENDPOINT = get_str_env("JWK_ENDPOINT")
JWT_ISSUER = get_str_env("JWT_ISSUER")
JWT_AUDIENCE = get_str_env("JWT_AUDIENCE")
JWT_AUTH_SCHEME = get_str_env("JWT_AUTH_SCHEME")

0 comments on commit 1eff99c

Please sign in to comment.