Skip to content

Commit

Permalink
Meta endpoint expansion (#562)
Browse files Browse the repository at this point in the history
* Update meta endpoints.

BREAKING CHANGE:

`meta/versions` is now gone, replaced with `meta/version` (server version) and `meta/libraries` (some core libraries).

* Use importlib.metadata as we're on python 3.10+.

* Add token-is-valid endpoint and update tests.

* Add user info endpoint.

This endpoint shows some meta information about the user in question. Normal users can see their own information, superusers, admins, group admins, and network admins can add `?username=foo` to see information about that user.

Information provided:

  - username
  - django status for the user (superuser, admin, active)
  - mreg status for the user (superuser, admin, group admin, network admin, hostpolicy admin, dns wildcard admin, and underscore admin).
  - group memberships
  - mreg permissions granted based on the groups one is a member of.

* Remove debugging.

* Fix distinction between requestor and target...

* Allow looking up oneself as a param.

* Add tests for news metas.
  • Loading branch information
terjekv authored Nov 23, 2024
1 parent 64a7cd9 commit 99c5aa9
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 11 deletions.
5 changes: 4 additions & 1 deletion mreg/api/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
urlpatterns = [
path('token-logout/', views.TokenLogout.as_view()),
path('token-auth/', views.ObtainExpiringAuthToken.as_view()),
path('token-is-valid/', views.TokenIsValid.as_view()),
path('meta/user', views.UserInfo.as_view()),
path('meta/version', views.MregVersion.as_view()),
path('meta/libraries', views.MetaVersions.as_view()),
path('meta/heartbeat', views.MetaHeartbeat.as_view()),
path('meta/versions', views.MetaVersions.as_view()),
]
36 changes: 31 additions & 5 deletions mreg/api/v1/tests/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,21 +260,26 @@ def test_token_rotation(self):

def test_token_usage(self):
old = ExpiringToken.objects.get(user=self.user)
self.assert_get("/api/token-is-valid/")
self.assert_get("/hosts/")
new = ExpiringToken.objects.get(user=self.user)
assert old.created == new.created
self.assertLess(old.last_used, new.last_used)

def test_is_active_false(self):
self.assert_get("/api/token-is-valid/")
self.assert_get("/hosts/")
self.user.is_active = False
self.user.save()
self.assert_get_and_401("/hosts/")
self.assert_get_and_401("/api/token-is-valid/")

def test_is_deleted(self):
self.assert_get("/hosts/")
self.assert_get("/api/token-is-valid/")
self.user.delete()
self.assert_get_and_401("/hosts/")
self.assert_get_and_401("/api/token-is-valid/")

def test_login_with_invalid_credentials(self):
self.client = APIClient()
Expand All @@ -287,15 +292,36 @@ def test_login_with_invalid_credentials(self):
class APIMetaTestCase(MregAPITestCase):
"""Test the meta API endpoint."""

def test_meta_versions_admin_200_ok(self):
response = self.assert_get_and_200("/api/meta/versions")
for key in ('rest_framework_version', 'django_version', 'python_version'):
def test_meta_libraries_admin_200_ok(self):
response = self.assert_get_and_200("/api/meta/libraries")
for key in ('djangorestframework', 'django', 'python'):
with self.subTest(key=key):
self.assertTrue(key in response.data)

def test_meta_versions_user_403_forbidden(self):
def test_meta_libraries_user_403_forbidden(self):
self.client = self.get_token_client(superuser=False)
self.assert_get_and_403("/api/meta/versions")
self.assert_get_and_403("/api/meta/libraries")

def test_meta_version_user_ok(self):
self.client = self.get_token_client(superuser=False)
response = self.assert_get("/api/meta/version")
self.assertTrue('version' in response.data)

def test_meta_user_info_self_200_ok(self):
self.client = self.get_token_client(superuser=False)
response = self.assert_get("/api/meta/user")
self.assertTrue('username' in response.data)

def test_meta_user_info_admin_other_target_200_ok(self):
response = self.assert_get("/api/meta/user?username=superuser")
self.assertTrue('username' in response.data)

def test_meta_user_info_user_other_target_403_forbidden(self):
self.client = self.get_token_client(superuser=False)
self.assert_get_and_403("/api/meta/user?username=superuser")

def test_meta_user_info_user_not_found_404_not_found(self):
self.assert_get_and_404("/api/meta/user?username=nonexistent")

def test_meta_heartbeat_user_200_ok(self):
self.client = self.get_token_client(superuser=False)
Expand Down
137 changes: 132 additions & 5 deletions mreg/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,46 @@
from typing import Any, cast

import django
from rest_framework import __version__ as res_version
from django.conf import settings
import structlog

from rest_framework import serializers, status
from rest_framework.authtoken.views import ObtainAuthToken
from rest_framework.exceptions import AuthenticationFailed
from rest_framework.exceptions import AuthenticationFailed, PermissionDenied, NotFound
from rest_framework.permissions import IsAuthenticated
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.views import APIView
from importlib.metadata import version

from psycopg2 import __libpq_version__ as libpq_version

from mreg.api.permissions import IsSuperOrNetworkAdminMember
from mreg.models.base import ExpiringToken
from mreg.models.auth import User
from mreg.models.network import NetGroupRegexPermission
from mreg.__about__ import __version__ as mreg_version

logger = structlog.getLogger(__name__)

start_time = int(time.time())

# Note the order here. This order is preserved in the response.
# Also, we add libpq-data to the end of this list so letting psycopg2-binary
# be last makes the context of the libpq version more clear.
LIBRARIES_TO_REPORT = [
"djangorestframework",
"django-auth-ldap",
"django-filter",
"django-logging-json",
"django-netfields",
"gunicorn",
"sentry-sdk",
"structlog",
"rich",
"psycopg2-binary",
]


class ObtainExpiringAuthToken(ObtainAuthToken):

Expand Down Expand Up @@ -62,17 +88,118 @@ def post(self, request: Request):
request.user.delete()
return Response(status=status.HTTP_200_OK)

class TokenIsValid(APIView):

permission_classes = (IsAuthenticated,)

def get(self, request: Request):
return Response(status=status.HTTP_200_OK)

###
### User infomation views
####

class UserInfo(APIView):

permission_classes = (IsAuthenticated,)

def get(self, request: Request):
# Identify the requesting user
user = request.user
req_groups = user.groups.all()
req_is_mreg_superuser = settings.SUPERUSER_GROUP in [group.name for group in req_groups]
req_is_mreg_admin = settings.ADMINUSER_GROUP in [group.name for group in req_groups]
req_is_mreg_group_admin = settings.GROUPADMINUSER_GROUP in [group.name for group in req_groups]
req_is_mreg_network_admin = settings.NETWORK_ADMIN_GROUP in [group.name for group in req_groups]

# Determine target user (default is the requesting user)
username = request.query_params.get("username")
target_user = user

if username and username != user.username:
# Only allow access to other user data if the requester is an mreg superuser
if not req_is_mreg_superuser or req_is_mreg_admin or req_is_mreg_group_admin or req_is_mreg_network_admin:
raise PermissionDenied("You do not have permission to view other users' details.")
try:
target_user = User.objects.get(username=username)
except User.DoesNotExist:
raise NotFound(f"User with username '{username}' not found.")

# Gather target user's information
target_groups = target_user.groups.all()
target_permissions = NetGroupRegexPermission.objects.filter(
group__in=[group.name for group in target_groups]
)

is_mreg_superuser = settings.SUPERUSER_GROUP in [group.name for group in target_groups]
is_mreg_admin = settings.ADMINUSER_GROUP in [group.name for group in target_groups]
is_mreg_group_admin = settings.GROUPADMINUSER_GROUP in [group.name for group in target_groups]
is_mreg_network_admin = settings.NETWORK_ADMIN_GROUP in [group.name for group in target_groups]
is_mreg_hostpolicy_admin = settings.HOSTPOLICYADMIN_GROUP in [group.name for group in target_groups]
is_mreg_dns_wildcard_admin = settings.DNS_WILDCARD_GROUP in [group.name for group in target_groups]
is_mreg_dns_underscore_admin = settings.DNS_UNDERSCORE_GROUP in [group.name for group in target_groups]

data = {
"username": target_user.username,
"django_status": {
"superuser": target_user.is_superuser,
"staff": target_user.is_staff,
"active": target_user.is_active,
},
"mreg_status": {
"superuser": is_mreg_superuser,
"admin": is_mreg_admin,
"group_admin": is_mreg_group_admin,
"network_admin": is_mreg_network_admin,
"hostpolicy_admin": is_mreg_hostpolicy_admin,
"dns_wildcard_admin": is_mreg_dns_wildcard_admin,
"underscore_admin": is_mreg_dns_underscore_admin
},
"groups": [group.name for group in target_groups],
"permissions": [
{
"group": permission.group,
"range": permission.range,
"regex": permission.regex,
"labels": [label.name for label in permission.labels.all()],
}
for permission in target_permissions
],
}

return Response(status=status.HTTP_200_OK, data=data)

###
### Introspection views
###
class MregVersion(APIView):

permission_classes = (IsAuthenticated,)

def get(self, request: Request):
data = {
"version": mreg_version,
}
return Response(status=status.HTTP_200_OK, data=data)

class MetaVersions(APIView):

permission_classes = (IsSuperOrNetworkAdminMember,)

def get(self, request: Request):
data = {
"django_version": django.get_version(),
"rest_framework_version": res_version,
"python_version": platform.python_version(),
"python": platform.python_version(),
"django": django.get_version(),
}

for library in LIBRARIES_TO_REPORT:
try:
data[library] = version(library)
except Exception as e:
logger.warning(event="library", reason=f"Failed to get version for {library}: {e}")
data[library] = "<unknown>"

data["libpq"] = str(libpq_version)
return Response(status=status.HTTP_200_OK, data=data)


Expand Down

0 comments on commit 99c5aa9

Please sign in to comment.