Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fixes #8232] REST API forbids saving maps and apps to non-admin and non-owners #8244

Merged
merged 3 commits into from
Oct 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions geonode/base/api/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
from rest_framework import permissions
from rest_framework.filters import BaseFilterBackend

from geonode.security.utils import get_resources_with_perms
from geonode.security.utils import (
get_users_with_perms,
get_resources_with_perms)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -113,14 +115,17 @@ def has_object_permission(self, request, view, obj):
return True

# Instance must have an attribute named `owner`.
_request_matches = False
if isinstance(obj, get_user_model()) and obj == request.user:
return True
_request_matches = True
elif hasattr(obj, 'owner'):
return obj.owner == request.user
_request_matches = obj.owner == request.user
elif hasattr(obj, 'user'):
return obj.user == request.user
else:
return False
_request_matches = obj.user == request.user

if not _request_matches:
_request_matches = request.user in get_users_with_perms(obj)
return _request_matches


class IsOwnerOrReadOnly(IsOwnerOrAdmin):
Expand Down
135 changes: 134 additions & 1 deletion geonode/base/api/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1111,7 +1111,7 @@ def test_set_thumbnail_from_bbox_from_logged_user_for_existing_dataset_raise_exp
Given a logged User and an existing dataset, should raise a ThumbnailException.
"""
# Admin
self.client.login(username="admin", password="admin")
self.assertTrue(self.client.login(username='admin', password='admin'))
dataset_id = Dataset.objects.first().resourcebase_ptr_id
url = reverse('base-resources-set-thumb-from-bbox', args=[dataset_id])
payload = {
Expand All @@ -1126,3 +1126,136 @@ def test_set_thumbnail_from_bbox_from_logged_user_for_existing_dataset_raise_exp
}
self.assertEqual(response.status_code, 500)
self.assertEqual(expected, response.json())

def test_manager_can_edit_map(self):
"""
REST API must not forbid saving maps and apps to non-admin and non-owners.
"""
from uuid import uuid1
from geonode.maps.models import Map
_map = Map.objects.filter(uuid__isnull=False, owner__username='admin').first()
if not len(_map.uuid):
_map.uuid = str(uuid1)
_map.save()
resource = ResourceBase.objects.filter(uuid=_map.uuid).first()
bobby = get_user_model().objects.get(username='bobby')

# Add perms to Bobby
resource_perm_spec_patch = {
'users': [
{
'id': bobby.id,
'username': bobby.username,
'first_name': bobby.first_name,
'last_name': bobby.last_name,
'avatar': '',
'permissions': 'manage'
}
]
}

# Patch the resource perms
self.assertTrue(self.client.login(username='admin', password='admin'))
data = f"uuid={resource.uuid}&permissions={json.dumps(resource_perm_spec_patch)}"
set_perms_url = urljoin(f"{reverse('base-resources-detail', kwargs={'pk': resource.pk})}/", 'permissions')
response = self.client.patch(set_perms_url, data=data, content_type='application/x-www-form-urlencoded')
self.assertEqual(response.status_code, 200)
self.assertIsNotNone(response.data.get('status'))
self.assertIsNotNone(response.data.get('status_url'))
status = response.data.get('status')
status_url = response.data.get('status_url')
_counter = 0
while _counter < 100 and status != ExecutionRequest.STATUS_FINISHED and status != ExecutionRequest.STATUS_FAILED:
response = self.client.get(status_url)
status = response.data.get('status')
sleep(3.0)
_counter += 1
logger.error(f"[{_counter}] GET {status_url} ----> {response.data}")
self.assertTrue(status, ExecutionRequest.STATUS_FINISHED)

get_perms_url = urljoin(f"{reverse('base-resources-detail', kwargs={'pk': _map.get_self_resource().pk})}/", 'permissions')
response = self.client.get(get_perms_url, format='json')
self.assertEqual(response.status_code, 200)
resource_perm_spec = response.data
self.assertEqual(
resource_perm_spec,
{
'users': [
{
'id': bobby.id,
'username': 'bobby',
'first_name': 'bobby',
'last_name': '',
'avatar': 'https://www.gravatar.com/avatar/d41d8cd98f00b204e9800998ecf8427e/?s=240',
'permissions': 'manage'
},
{
'id': 1,
'username': 'admin',
'first_name': 'admin',
'last_name': '',
'avatar': 'https://www.gravatar.com/avatar/7a68c67c8d409ff07e42aa5d5ab7b765/?s=240',
'permissions': 'owner'
}
],
'organizations': [],
'groups': [
{
'id': 3,
'title': 'anonymous',
'name': 'anonymous',
'permissions': 'download'
},
{
'id': 2,
'title': 'Registered Members',
'name': 'registered-members',
'permissions': 'none'
}
]
}
)

# Fetch the map perms as user "bobby"
self.assertTrue(self.client.login(username='bobby', password='bob'))
response = self.client.get(get_perms_url, format='json')
self.assertEqual(response.status_code, 200)
resource_perm_spec = response.data
self.assertEqual(
resource_perm_spec,
{
'users': [
{
'id': 1,
'username': 'admin',
'first_name': 'admin',
'last_name': '',
'avatar': 'https://www.gravatar.com/avatar/7a68c67c8d409ff07e42aa5d5ab7b765/?s=240',
'permissions': 'owner'
},
{
'id': bobby.id,
'username': 'bobby',
'first_name': 'bobby',
'last_name': '',
'avatar': 'https://www.gravatar.com/avatar/d41d8cd98f00b204e9800998ecf8427e/?s=240',
'permissions': 'manage'
}
],
'organizations': [],
'groups': [
{
'id': 3,
'title': 'anonymous',
'name': 'anonymous',
'permissions': 'download'
},
{
'id': 2,
'title': 'Registered Members',
'name': 'registered-members',
'permissions': 'none'
}
]
}
)
3 changes: 2 additions & 1 deletion geonode/base/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,8 +481,9 @@ def resource_service_permissions(self, request, pk=None):
"""
config = Configuration.load()
resource = self.get_object()
_user_can_manage = request.user.has_perm('change_resourcebase', resource.get_self_resource()) or request.user.has_perm('change_resourcebase_permissions', resource.get_self_resource())
if config.read_only or config.maintenance or request.user.is_anonymous or not request.user.is_authenticated or \
resource is None or not request.user.has_perm('change_resourcebase', resource.get_self_resource()):
resource is None or not _user_can_manage:
return Response(status=status.HTTP_403_FORBIDDEN)
try:
perms_spec = PermSpec(resource.get_all_level_info(), resource)
Expand Down