diff --git a/geonode/base/api/permissions.py b/geonode/base/api/permissions.py index 5efc48bcc0b..696a15f1552 100644 --- a/geonode/base/api/permissions.py +++ b/geonode/base/api/permissions.py @@ -22,7 +22,9 @@ from rest_framework import permissions from rest_framework.filters import BaseFilterBackend -from geonode.security.utils import get_resources_with_perms +from geonode.security.utils import ( + get_users_with_perms, + get_resources_with_perms) logger = logging.getLogger(__name__) @@ -113,14 +115,17 @@ def has_object_permission(self, request, view, obj): return True # Instance must have an attribute named `owner`. + _request_matches = False if isinstance(obj, get_user_model()) and obj == request.user: - return True + _request_matches = True elif hasattr(obj, 'owner'): - return obj.owner == request.user + _request_matches = obj.owner == request.user elif hasattr(obj, 'user'): - return obj.user == request.user - else: - return False + _request_matches = obj.user == request.user + + if not _request_matches: + _request_matches = request.user in get_users_with_perms(obj) + return _request_matches class IsOwnerOrReadOnly(IsOwnerOrAdmin): diff --git a/geonode/base/api/tests.py b/geonode/base/api/tests.py index 7c8954896fa..e556f31d0d7 100644 --- a/geonode/base/api/tests.py +++ b/geonode/base/api/tests.py @@ -1111,7 +1111,7 @@ def test_set_thumbnail_from_bbox_from_logged_user_for_existing_dataset_raise_exp Given a logged User and an existing dataset, should raise a ThumbnailException. """ # Admin - self.client.login(username="admin", password="admin") + self.assertTrue(self.client.login(username='admin', password='admin')) dataset_id = Dataset.objects.first().resourcebase_ptr_id url = reverse('base-resources-set-thumb-from-bbox', args=[dataset_id]) payload = { @@ -1126,3 +1126,136 @@ def test_set_thumbnail_from_bbox_from_logged_user_for_existing_dataset_raise_exp } self.assertEqual(response.status_code, 500) self.assertEqual(expected, response.json()) + + def test_manager_can_edit_map(self): + """ + REST API must not forbid saving maps and apps to non-admin and non-owners. + """ + from uuid import uuid1 + from geonode.maps.models import Map + _map = Map.objects.filter(uuid__isnull=False, owner__username='admin').first() + if not len(_map.uuid): + _map.uuid = str(uuid1) + _map.save() + resource = ResourceBase.objects.filter(uuid=_map.uuid).first() + bobby = get_user_model().objects.get(username='bobby') + + # Add perms to Bobby + resource_perm_spec_patch = { + 'users': [ + { + 'id': bobby.id, + 'username': bobby.username, + 'first_name': bobby.first_name, + 'last_name': bobby.last_name, + 'avatar': '', + 'permissions': 'manage' + } + ] + } + + # Patch the resource perms + self.assertTrue(self.client.login(username='admin', password='admin')) + data = f"uuid={resource.uuid}&permissions={json.dumps(resource_perm_spec_patch)}" + set_perms_url = urljoin(f"{reverse('base-resources-detail', kwargs={'pk': resource.pk})}/", 'permissions') + response = self.client.patch(set_perms_url, data=data, content_type='application/x-www-form-urlencoded') + self.assertEqual(response.status_code, 200) + self.assertIsNotNone(response.data.get('status')) + self.assertIsNotNone(response.data.get('status_url')) + status = response.data.get('status') + status_url = response.data.get('status_url') + _counter = 0 + while _counter < 100 and status != ExecutionRequest.STATUS_FINISHED and status != ExecutionRequest.STATUS_FAILED: + response = self.client.get(status_url) + status = response.data.get('status') + sleep(3.0) + _counter += 1 + logger.error(f"[{_counter}] GET {status_url} ----> {response.data}") + self.assertTrue(status, ExecutionRequest.STATUS_FINISHED) + + get_perms_url = urljoin(f"{reverse('base-resources-detail', kwargs={'pk': _map.get_self_resource().pk})}/", 'permissions') + response = self.client.get(get_perms_url, format='json') + self.assertEqual(response.status_code, 200) + resource_perm_spec = response.data + self.assertEqual( + resource_perm_spec, + { + 'users': [ + { + 'id': bobby.id, + 'username': 'bobby', + 'first_name': 'bobby', + 'last_name': '', + 'avatar': 'https://www.gravatar.com/avatar/d41d8cd98f00b204e9800998ecf8427e/?s=240', + 'permissions': 'manage' + }, + { + 'id': 1, + 'username': 'admin', + 'first_name': 'admin', + 'last_name': '', + 'avatar': 'https://www.gravatar.com/avatar/7a68c67c8d409ff07e42aa5d5ab7b765/?s=240', + 'permissions': 'owner' + } + ], + 'organizations': [], + 'groups': [ + { + 'id': 3, + 'title': 'anonymous', + 'name': 'anonymous', + 'permissions': 'download' + }, + { + 'id': 2, + 'title': 'Registered Members', + 'name': 'registered-members', + 'permissions': 'none' + } + ] + } + ) + + # Fetch the map perms as user "bobby" + self.assertTrue(self.client.login(username='bobby', password='bob')) + response = self.client.get(get_perms_url, format='json') + self.assertEqual(response.status_code, 200) + resource_perm_spec = response.data + self.assertEqual( + resource_perm_spec, + { + 'users': [ + { + 'id': 1, + 'username': 'admin', + 'first_name': 'admin', + 'last_name': '', + 'avatar': 'https://www.gravatar.com/avatar/7a68c67c8d409ff07e42aa5d5ab7b765/?s=240', + 'permissions': 'owner' + }, + { + 'id': bobby.id, + 'username': 'bobby', + 'first_name': 'bobby', + 'last_name': '', + 'avatar': 'https://www.gravatar.com/avatar/d41d8cd98f00b204e9800998ecf8427e/?s=240', + 'permissions': 'manage' + } + ], + 'organizations': [], + 'groups': [ + { + 'id': 3, + 'title': 'anonymous', + 'name': 'anonymous', + 'permissions': 'download' + }, + { + 'id': 2, + 'title': 'Registered Members', + 'name': 'registered-members', + 'permissions': 'none' + } + ] + } + ) diff --git a/geonode/base/api/views.py b/geonode/base/api/views.py index dcfd66cba0c..ae3591a3772 100644 --- a/geonode/base/api/views.py +++ b/geonode/base/api/views.py @@ -481,8 +481,9 @@ def resource_service_permissions(self, request, pk=None): """ config = Configuration.load() resource = self.get_object() + _user_can_manage = request.user.has_perm('change_resourcebase', resource.get_self_resource()) or request.user.has_perm('change_resourcebase_permissions', resource.get_self_resource()) if config.read_only or config.maintenance or request.user.is_anonymous or not request.user.is_authenticated or \ - resource is None or not request.user.has_perm('change_resourcebase', resource.get_self_resource()): + resource is None or not _user_can_manage: return Response(status=status.HTTP_403_FORBIDDEN) try: perms_spec = PermSpec(resource.get_all_level_info(), resource)