Skip to content

Commit

Permalink
[Fixes #8232] REST API forbids saving maps and apps to non-admin and …
Browse files Browse the repository at this point in the history
…non-owners (#8244)

* [Fixes #8232] REST API forbids saving maps and apps to non-admin and non-owners

* [CircleCI] Test fixes
  • Loading branch information
Alessio Fabiani authored Oct 15, 2021
1 parent d433676 commit 314bb62
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 8 deletions.
17 changes: 11 additions & 6 deletions geonode/base/api/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
from rest_framework import permissions
from rest_framework.filters import BaseFilterBackend

from geonode.security.utils import get_resources_with_perms
from geonode.security.utils import (
get_users_with_perms,
get_resources_with_perms)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -113,14 +115,17 @@ def has_object_permission(self, request, view, obj):
return True

# Instance must have an attribute named `owner`.
_request_matches = False
if isinstance(obj, get_user_model()) and obj == request.user:
return True
_request_matches = True
elif hasattr(obj, 'owner'):
return obj.owner == request.user
_request_matches = obj.owner == request.user
elif hasattr(obj, 'user'):
return obj.user == request.user
else:
return False
_request_matches = obj.user == request.user

if not _request_matches:
_request_matches = request.user in get_users_with_perms(obj)
return _request_matches


class IsOwnerOrReadOnly(IsOwnerOrAdmin):
Expand Down
135 changes: 134 additions & 1 deletion geonode/base/api/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1111,7 +1111,7 @@ def test_set_thumbnail_from_bbox_from_logged_user_for_existing_dataset_raise_exp
Given a logged User and an existing dataset, should raise a ThumbnailException.
"""
# Admin
self.client.login(username="admin", password="admin")
self.assertTrue(self.client.login(username='admin', password='admin'))
dataset_id = Dataset.objects.first().resourcebase_ptr_id
url = reverse('base-resources-set-thumb-from-bbox', args=[dataset_id])
payload = {
Expand All @@ -1126,3 +1126,136 @@ def test_set_thumbnail_from_bbox_from_logged_user_for_existing_dataset_raise_exp
}
self.assertEqual(response.status_code, 500)
self.assertEqual(expected, response.json())

def test_manager_can_edit_map(self):
"""
REST API must not forbid saving maps and apps to non-admin and non-owners.
"""
from uuid import uuid1
from geonode.maps.models import Map
_map = Map.objects.filter(uuid__isnull=False, owner__username='admin').first()
if not len(_map.uuid):
_map.uuid = str(uuid1)
_map.save()
resource = ResourceBase.objects.filter(uuid=_map.uuid).first()
bobby = get_user_model().objects.get(username='bobby')

# Add perms to Bobby
resource_perm_spec_patch = {
'users': [
{
'id': bobby.id,
'username': bobby.username,
'first_name': bobby.first_name,
'last_name': bobby.last_name,
'avatar': '',
'permissions': 'manage'
}
]
}

# Patch the resource perms
self.assertTrue(self.client.login(username='admin', password='admin'))
data = f"uuid={resource.uuid}&permissions={json.dumps(resource_perm_spec_patch)}"
set_perms_url = urljoin(f"{reverse('base-resources-detail', kwargs={'pk': resource.pk})}/", 'permissions')
response = self.client.patch(set_perms_url, data=data, content_type='application/x-www-form-urlencoded')
self.assertEqual(response.status_code, 200)
self.assertIsNotNone(response.data.get('status'))
self.assertIsNotNone(response.data.get('status_url'))
status = response.data.get('status')
status_url = response.data.get('status_url')
_counter = 0
while _counter < 100 and status != ExecutionRequest.STATUS_FINISHED and status != ExecutionRequest.STATUS_FAILED:
response = self.client.get(status_url)
status = response.data.get('status')
sleep(3.0)
_counter += 1
logger.error(f"[{_counter}] GET {status_url} ----> {response.data}")
self.assertTrue(status, ExecutionRequest.STATUS_FINISHED)

get_perms_url = urljoin(f"{reverse('base-resources-detail', kwargs={'pk': _map.get_self_resource().pk})}/", 'permissions')
response = self.client.get(get_perms_url, format='json')
self.assertEqual(response.status_code, 200)
resource_perm_spec = response.data
self.assertEqual(
resource_perm_spec,
{
'users': [
{
'id': bobby.id,
'username': 'bobby',
'first_name': 'bobby',
'last_name': '',
'avatar': 'https://www.gravatar.com/avatar/d41d8cd98f00b204e9800998ecf8427e/?s=240',
'permissions': 'manage'
},
{
'id': 1,
'username': 'admin',
'first_name': 'admin',
'last_name': '',
'avatar': 'https://www.gravatar.com/avatar/7a68c67c8d409ff07e42aa5d5ab7b765/?s=240',
'permissions': 'owner'
}
],
'organizations': [],
'groups': [
{
'id': 3,
'title': 'anonymous',
'name': 'anonymous',
'permissions': 'download'
},
{
'id': 2,
'title': 'Registered Members',
'name': 'registered-members',
'permissions': 'none'
}
]
}
)

# Fetch the map perms as user "bobby"
self.assertTrue(self.client.login(username='bobby', password='bob'))
response = self.client.get(get_perms_url, format='json')
self.assertEqual(response.status_code, 200)
resource_perm_spec = response.data
self.assertEqual(
resource_perm_spec,
{
'users': [
{
'id': 1,
'username': 'admin',
'first_name': 'admin',
'last_name': '',
'avatar': 'https://www.gravatar.com/avatar/7a68c67c8d409ff07e42aa5d5ab7b765/?s=240',
'permissions': 'owner'
},
{
'id': bobby.id,
'username': 'bobby',
'first_name': 'bobby',
'last_name': '',
'avatar': 'https://www.gravatar.com/avatar/d41d8cd98f00b204e9800998ecf8427e/?s=240',
'permissions': 'manage'
}
],
'organizations': [],
'groups': [
{
'id': 3,
'title': 'anonymous',
'name': 'anonymous',
'permissions': 'download'
},
{
'id': 2,
'title': 'Registered Members',
'name': 'registered-members',
'permissions': 'none'
}
]
}
)
3 changes: 2 additions & 1 deletion geonode/base/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,8 +481,9 @@ def resource_service_permissions(self, request, pk=None):
"""
config = Configuration.load()
resource = self.get_object()
_user_can_manage = request.user.has_perm('change_resourcebase', resource.get_self_resource()) or request.user.has_perm('change_resourcebase_permissions', resource.get_self_resource())
if config.read_only or config.maintenance or request.user.is_anonymous or not request.user.is_authenticated or \
resource is None or not request.user.has_perm('change_resourcebase', resource.get_self_resource()):
resource is None or not _user_can_manage:
return Response(status=status.HTTP_403_FORBIDDEN)
try:
perms_spec = PermSpec(resource.get_all_level_info(), resource)
Expand Down

0 comments on commit 314bb62

Please sign in to comment.