Skip to content

Commit

Permalink
Further coverage fixes, now with mocks. (#568)
Browse files Browse the repository at this point in the history
* Replace a few no-cover pragmas with mocks.
* Use log.exception to log exceptions.
  - Also set exc_info=True to ensure structlog deals with the stack trace.
* Updates for new user model, also add trivial 401 checks for hostpolicy endpoints.
* Only fetch user after `IsAuthenticated.has_permission()` has been called.
  - This is mostly due a coverage fix with the way IsAuthenticated is implemented the same way as the raising of NotAuthenticated in User.from_request.
* Make it so illegal CIDRs in range queries return 400/Bad request instead of empty results.
* Test put disallowed, add explicit toml support for coverage.
* Support `pragma: no cover` again.
* no cover-pragmas for settings.py.
  • Loading branch information
terjekv authored Dec 6, 2024
1 parent 1b7126c commit cee5c16
Show file tree
Hide file tree
Showing 13 changed files with 153 additions and 35 deletions.
5 changes: 3 additions & 2 deletions hostpolicy/api/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ class IsSuperOrHostPolicyAdminOrReadOnly(IsAuthenticated):
"""

def has_permission(self, request, view):
user = User.from_request(request)

if not super().has_permission(request, view):
# Not even reading is allowed if you're not authenticated
return False

user = User.from_request(request)

if request.method in SAFE_METHODS:
return True
if user.is_mreg_superuser_or_hostpolicy_admin:
Expand Down
9 changes: 9 additions & 0 deletions hostpolicy/api/v1/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ def test_unique_namespace(self):
self.assert_post('/api/v1/hostpolicy/atoms/', data)
self.assert_post_and_400('/api/v1/hostpolicy/roles/', data)

class HostPolicyBasicTestCase(MregAPITestCase):

def test_get_unauthenticated_401_unauthenticated(self):
"""Unauthenticated users should get 401 on all endpoints"""
self.client.logout()
self.assert_get_and_401('/api/v1/hostpolicy/roles/')
self.assert_get_and_401('/api/v1/hostpolicy/atoms/')



class HostPolicyRoleTestCase(MregAPITestCase):
"""This class defines the test suite for api/hostpolicyroles"""
Expand Down
19 changes: 8 additions & 11 deletions mreg/api/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,12 @@ class IsSuperOrNetworkAdminMember(IsAuthenticated):
"""

def has_permission(self, request, view):
user = User.from_request(request)
import mreg.api.v1.views
if not super().has_permission(request, view):
return False

user = User.from_request(request)

if user.is_mreg_superuser:
return True
if user.is_mreg_network_admin:
Expand All @@ -71,9 +73,9 @@ class IsSuperOrGroupAdminOrReadOnly(IsAuthenticated):
"""

def has_permission(self, request, view):
user = User.from_request(request)
if not super().has_permission(request, view):
return False
user = User.from_request(request)
if request.method in SAFE_METHODS:
return True
return user.is_mreg_superuser or user.is_mreg_hostgroup_admin
Expand Down Expand Up @@ -133,11 +135,11 @@ class IsGrantedNetGroupRegexPermission(IsAuthenticated):
"""

def has_permission(self, request, view):
user = User.from_request(request)
# This method is called before the view is executed, so
# just do some preliminary checks.
if not super().has_permission(request, view):
return False
user = User.from_request(request)
if request.method in SAFE_METHODS:
return True
if user.is_mreg_superuser_or_admin:
Expand Down Expand Up @@ -193,9 +195,7 @@ def has_create_permission(self, request, view, validated_serializer):
hostname = data['host'].name
elif 'host' in data:
hostname, ips = self._get_hostname_and_ips(data['host'])
else: # pragma: no cover
# Testing these kinds of should-never-happen codepaths is hard.
# We have to basically mock a complete API call and then break it.
else:
raise exceptions.PermissionDenied(f"Unhandled view: {view}")

if ips and hostname:
Expand All @@ -213,9 +213,7 @@ def has_destroy_permission(self, request, view, validated_serializer):
pass
elif hasattr(obj, 'host'):
obj = obj.host
else: # pragma: no cover
# Testing these kinds of should-never-happen codepaths is hard.
# We have to basically mock a complete API call and then break it.
else:
raise exceptions.PermissionDenied(f"Unhandled view: {view}")
if _deny_superuser_only_names(name=obj.name, view=view, request=request):
return False
Expand Down Expand Up @@ -273,10 +271,9 @@ class HostGroupPermission(IsAuthenticated):
def has_permission(self, request, view):
# This method is called before the view is executed, so
# just do some preliminary checks.
user = User.from_request(request)

if not super().has_permission(request, view):
return False
user = User.from_request(request)
if request.method in SAFE_METHODS:
return True
if user.is_mreg_superuser or user.is_mreg_hostgroup_admin:
Expand Down
3 changes: 2 additions & 1 deletion mreg/api/v1/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import structlog
from django.db.models import Q
from django_filters import rest_framework as filters
from rest_framework import exceptions

from mreg.models.base import History
from mreg.models.host import BACnetID, Host, HostGroup, Ipaddress, PtrOverride
Expand Down Expand Up @@ -56,7 +57,7 @@ def filter(self, qs, value):
cidr = IPNetwork(value)
return qs.filter(**{f"{self.field_name}__net_contains_or_equals": str(cidr)})
except AddrFormatError:
return qs.none()
raise exceptions.ValidationError(f"Invalid CIDR: {value}")

class BACnetIDFilterSet(filters.FilterSet):
class Meta:
Expand Down
12 changes: 5 additions & 7 deletions mreg/api/v1/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from mreg.models.base import History

log = structlog.get_logger(__name__)
log = structlog.get_logger("mreg.history")


class DjangoJSONModelEncoder(DjangoJSONEncoder):
Expand Down Expand Up @@ -57,11 +57,10 @@ def save_log(self, action, serializer, data, orig_data=None):
data=json_data,
)

# We should never fail at performing a clean on the testdata itself.
try:
history.full_clean()
except ValidationError as e: # pragma: no cover
print(e)
except ValidationError:
log.exception("ValidationError occured during full_clean()", exc_info=True)
return
history.save()

Expand Down Expand Up @@ -100,11 +99,10 @@ def save_log_m2m_alteration(self, method, instance):
data=data,
)

# We should never fail at performing a clean on the testdata itself.
try:
history.full_clean()
except ValidationError as e: # pragma: no cover
print(e)
except ValidationError:
log.exception("ValidationError occured during full_clean()", exc_info=True)
return
history.save()

Expand Down
6 changes: 3 additions & 3 deletions mreg/api/v1/tests/test_filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,13 @@ def create_roles(
) -> Tuple[List[HostPolicyRole], List[HostPolicyAtom], List[Label]]:
"""Create roles."""

if not atoms:
if not atoms: # pragma: no cover
atoms = create_atoms(f"{name}atom", len(hosts))

if not labels:
if not labels: # pragma: no cover
labels = create_labels(f"{name}label", len(hosts))

if len(hosts) != len(atoms) or len(hosts) != len(labels):
if len(hosts) != len(atoms) or len(hosts) != len(labels): # pragma: no cover
raise ValueError("Hosts, Atoms, and Labels must be the same length.")

roles: List[HostPolicyRole] = []
Expand Down
47 changes: 47 additions & 0 deletions mreg/api/v1/tests/test_history.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from unittest import TestCase, mock
from django.core.exceptions import ValidationError
from mreg.api.v1.history import HistoryLog

class TestHistoryLog(TestCase):

@mock.patch('django.db.models.Model.full_clean')
@mock.patch('mreg.api.v1.history.HistoryLog.manipulate_data')
@mock.patch('mreg.api.v1.history.HistoryLog.get_jsondata')
def test_save_log_handles_validation_error(self, mock_get_jsondata, mock_manipulate_data, mock_full_clean):
mock_full_clean.side_effect = ValidationError('Test error')
mock_get_jsondata.return_value = "json data"
mock_manipulate_data.return_value = None

# Mocking Serializer and its Meta
mock_serializer = mock.Mock()
mock_meta = mock.Mock()
mock_meta.model = mock.Mock(__name__='TestModel')
mock_serializer.Meta = mock_meta
mock_serializer.data = {'id': 1, 'name': 'Test'}

data = {'key': 'value'}

# Creating HistoryLog instance
log = HistoryLog()
log.model = mock_meta.model
log.foreign_key_name = 'key'
log.request = mock.Mock(user=mock.Mock())
log.log_resource = 'test_resource'
log.m2m_field = 'test_field'
log.object = mock.Mock(name='TestObject', id=2)

# Mocking an instance for save_log_m2m_alteration method
instance = mock.Mock(id=1, name='TestInstance')

# Mocking a method for save_log_m2m_alteration method
method = mock.Mock(__name__='TestMethod')

with self.assertLogs('mreg.history', level='ERROR') as cm:
log.save_log('test_action', mock_serializer, data)
log.save_log_m2m_alteration(method, instance)

self.assertEqual(mock_full_clean.call_count, 2)

# Assert that the error was logged twice, with tracebacks.
self.assertTrue(cm.output[0].startswith('ERROR:mreg.history:'))
self.assertTrue(cm.output[1].startswith('ERROR:mreg.history:'))
54 changes: 53 additions & 1 deletion mreg/api/v1/tests/test_permissions.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,60 @@
from rest_framework.test import APIClient

from unittest import mock

from django.test import RequestFactory
from rest_framework.exceptions import PermissionDenied
from rest_framework.test import APIClient, force_authenticate

from mreg.api.permissions import IsGrantedNetGroupRegexPermission

from .tests import MregAPITestCase


class TestIsGrantedNetGroupRegexPermission(MregAPITestCase):

@mock.patch('mreg.api.permissions.User.from_request')
@mock.patch('mreg.api.permissions.IsGrantedNetGroupRegexPermission.has_obj_perm', return_value=False)
@mock.patch('mreg.api.permissions.IsGrantedNetGroupRegexPermission._get_hostname_and_ips',
return_value=('hostname', ['ip']))
def test_unhandled_view(
self,
mock_get_hostname_and_ips,
mock_has_obj_perm,
mock_user_from_request
):
request = RequestFactory().post('/')
user = mock.Mock()
user.group_list = []
user.is_mreg_superuser = False
user.is_mreg_admin = False
request.user = user
# Make it so every time we call User.from_request, it returns the same user object
# that we created above.
mock_user_from_request.return_value = user
force_authenticate(request, user=user)

# Mock view that is not an instance of any of the checked classes
view = mock.Mock()

# Mock object that doesn't have 'host' attribute
view.get_object = mock.Mock(return_value=None)

# Mock serializer with data that doesn't have 'host' or 'ipaddress'
serializer = mock.Mock()
serializer.validated_data = {}

permission = IsGrantedNetGroupRegexPermission()

with self.assertRaises(PermissionDenied):
permission.has_create_permission(request, view, serializer)

with self.assertRaises(PermissionDenied):
permission.has_update_permission(request, view, serializer)

with self.assertRaises(PermissionDenied):
permission.has_destroy_permission(request, view, serializer)


class NetGroupRegexPermissionTestCase(MregAPITestCase):

data = {'group': 'testgroup', 'range': '10.0.0.0/24',
Expand Down
13 changes: 13 additions & 0 deletions mreg/api/v1/tests/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from unittest import skip

import unittest.mock as mock
from unittest_parametrize import ParametrizedTestCase, param, parametrize


from django.conf import settings
from django.contrib.auth import get_user_model
Expand Down Expand Up @@ -189,6 +191,17 @@ def create_reverse_zone(name='10.10.in-addr.arpa', primary_ns='ns.example.org',
return ReverseZone.objects.create(name=name, primary_ns=primary_ns, email=email)


class MregPutNotAllowedTestCase(ParametrizedTestCase, MregAPITestCase):

@parametrize(('path',), [
param('/hosts/'),
param('/networks/'),
param('/zones/forward/'),
])
def test_put_not_allowed(self, path: str):
response = self.client.put(self._create_path(path))
self.assertEqual(response.status_code, 405)

class APITestInternals(MregAPITestCase):
"""Test internal methods."""

Expand Down
6 changes: 3 additions & 3 deletions mreg/middleware/logging_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def __call__(self, request: HttpRequest) -> HttpResponse:

try:
response = self.get_response(request)
except Exception as e:
except Exception as e: # pragma: no cover (this is somewhat tricky to properly test)
self.log_exception(request, e, start_time)
raise

Expand Down Expand Up @@ -80,7 +80,7 @@ def _get_request_header(
self, request: HttpRequest, header_key: str, meta_key: str
) -> str:
"""Get the value of a header from the request, either via headers or META."""
if hasattr(request, "headers"):
if hasattr(request, "headers"): # pragma: no cover
return request.headers.get(header_key)

return request.META.get(meta_key)
Expand Down Expand Up @@ -180,7 +180,7 @@ def log_response(

return response

def log_exception(self, request: HttpRequest, exception: Exception, start_time: float) -> None:
def log_exception(self, request: HttpRequest, exception: Exception, start_time: float) -> None: # pragma: no cover
"""Log an exception that occurred during request processing."""
end_time = time.time()
run_time_ms = (end_time - start_time) * 1000
Expand Down
10 changes: 5 additions & 5 deletions mregsite/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,20 @@

# If the log directory doesn't exist, create it.
log_dir = os.path.dirname(LOG_FILE_NAME)
if not os.path.exists(log_dir):
try:
if not os.path.exists(log_dir): # pragma: no cover
try: # pragma: no cover
os.makedirs(log_dir)
except OSError as e:
print(f"Failed to create log directory {log_dir}: {e}")
sys.exit(1)

# Check if the log file and directory is writable.
if not os.access(log_dir, os.W_OK):
if not os.access(log_dir, os.W_OK): # pragma: no cover
print(f"Log directory {log_dir} is not writable")
sys.exit(1)

# Check if LOG_FILE_NAME exists and if it is writable.
if os.path.exists(LOG_FILE_NAME) and not os.access(LOG_FILE_NAME, os.W_OK):
if os.path.exists(LOG_FILE_NAME) and not os.access(LOG_FILE_NAME, os.W_OK): # pragma: no cover
print(f"Log file {LOG_FILE_NAME} is not writable")
sys.exit(1)

Expand Down Expand Up @@ -248,7 +248,7 @@
structlog.stdlib.ProcessorFormatter.remove_processors_meta,
structlog.dev.ConsoleRenderer(colors=True, sort_keys=False),
]
else:
else: # pragma: no cover
console_processors = [
structlog.stdlib.ProcessorFormatter.remove_processors_meta,
structlog.processors.JSONRenderer(),
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ dynamic = ["version"]
[dependency-groups]
dev = [
"tox-uv",
"coverage",
"coverage[toml]",
"pytest",
"pytest-django",
]
Expand Down
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ commands =
[coverage:report]
fail_under = 98
show_missing = true
exclude_lines =
exclude_also =
'pragma: no cover'
'def __repr__'

Expand Down

0 comments on commit cee5c16

Please sign in to comment.