diff --git a/app/stac_api/admin.py b/app/stac_api/admin.py index 189d2a6f..85fd4381 100644 --- a/app/stac_api/admin.py +++ b/app/stac_api/admin.py @@ -29,6 +29,7 @@ from stac_api.models import Provider from stac_api.utils import build_asset_href from stac_api.utils import get_query_params +from stac_api.validators import validate_href_url from stac_api.validators import validate_text_to_geometry logger = logging.getLogger(__name__) @@ -387,8 +388,50 @@ def get_fieldsets(self, request, obj=None): return fields +class AssetAdminForm(forms.ModelForm): + + def __init__(self, *args, **kwargs): + """If it's an external asset, we switch the file field to a char field""" + super().__init__(*args, **kwargs) + if self.instance: + are_external_assets_allowed = self.instance.item.collection.allow_external_assets + + if are_external_assets_allowed: + external_field = self.fields['is_external'] + external_field.help_text = ( + _('Whether this asset is hosted externally. Save the form in ' + 'order to toggle the file field between input and file widget.') + ) + + if self.instance.is_external: + # can't just change the widget, otherwise it is impossible to + # change the value! + self.fields['file'] = forms.CharField() + + # make it a bit wider + self.fields['file'].widget.attrs['size'] = 150 + self.fields['file'].widget.attrs['placeholder' + ] = 'https://map.geo.admin.ch/external.jpg' + + def clean_file(self): + if self.instance: + external_changed = 'is_external' in self.changed_data + is_external = self.cleaned_data.get('is_external') + + # if we're just changing from internal to external, so we don't + # validate the url, because it is potentially still the previous, + # internal file + if is_external and not external_changed: + file = self.cleaned_data.get('file') + + validate_href_url(file, self.instance.item.collection) + + return self.cleaned_data.get('file') + + @admin.register(Asset) class AssetAdmin(admin.ModelAdmin): + form = AssetAdminForm class Media: js = ('js/admin/asset_help_search.js', 'js/admin/asset_external_fields.js') @@ -476,29 +519,6 @@ def href(self, instance): return path return build_asset_href(self.request, path) - def render_change_form(self, request, context, *args, **kwargs): - """Make the file field be a text input if asset is external""" - obj = kwargs['obj'] - if obj is not None: - are_external_assets_allowed = obj.item.collection.allow_external_assets - - form_instance = context['adminform'].form - - if are_external_assets_allowed: - external_field = form_instance.fields['is_external'] - external_field.help_text = ( - _('Whether this asset is hosted externally. Save the form in ' - 'order to toggle the file field between input and file widget.') - ) - - if obj.is_external: - form_instance.fields['file'].widget = forms.TextInput( - attrs={ - 'size': 150, 'placeholder': 'https://map.geo.admin.ch/external.jpg' - } - ) - return super().render_change_form(request, context, *args, **kwargs) - def get_fieldsets(self, request, obj=None): """Build the different field sets for the admin page diff --git a/app/stac_api/serializers.py b/app/stac_api/serializers.py index 6c1c26c1..2fe2decf 100644 --- a/app/stac_api/serializers.py +++ b/app/stac_api/serializers.py @@ -2,12 +2,9 @@ from collections import OrderedDict from urllib.parse import urlparse -import requests - from django.conf import settings from django.contrib.gis.geos import GEOSGeometry -from django.core import exceptions -from django.core.validators import URLValidator +from django.core.exceptions import ValidationError as CoreValidationError from django.utils.translation import gettext_lazy as _ from rest_framework import serializers @@ -41,6 +38,7 @@ from stac_api.validators import validate_checksum_multihash_sha256 from stac_api.validators import validate_content_encoding from stac_api.validators import validate_geoadmin_variant +from stac_api.validators import validate_href_url from stac_api.validators import validate_item_properties_datetimes from stac_api.validators import validate_md5_parts from stac_api.validators import validate_name @@ -402,76 +400,6 @@ def to_representation(self, instance): ) return representation - def _validate_general_url_pattern(self, url): - try: - validator = URLValidator() - validator(url) - except exceptions.ValidationError as exc: - errors = {'href': _('Invalid URL provided')} - raise serializers.ValidationError(code='payload', detail=errors) from exc - - def _validate_configured_url_pattern(self, url): - """Validate the URL against the whitelist""" - whitelist = self.collection.external_asset_whitelist - - for entry in whitelist: - if url.startswith(entry): - return True - - logger.info( - "Attempted external asset upload didn't match the whitelist", - extra={ - 'url': url, 'whitelist': whitelist, 'collection': self.collection - } - ) - - # none of the prefixes matches - errors = {'href': _("Invalid URL provided. It doesn't match the collection whitelist")} - raise serializers.ValidationError(code='payload', detail=errors) - - def _validate_reachability(self, url): - unreachable_error = {'href': _('Provided URL is unreachable')} - try: - response = requests.head(url, timeout=settings.EXTERNAL_URL_REACHABLE_TIMEOUT) - - if response.status_code > 400: - logger.info( - "Attempted external asset upload failed the reachability check", - extra={ - 'url': url, - 'collection': self.collection, - 'response': response, - } - ) - raise serializers.ValidationError(code='payload', detail=unreachable_error) - except requests.Timeout as exc: - logger.info( - "Attempted external asset upload resulted in a timeout", - extra={ - 'url': url, - 'collection': self.collection, - 'exception': exc, - 'timeout': settings.EXTERNAL_URL_REACHABLE_TIMEOUT - } - ) - errors = {'href': _('Checking href URL resulted in timeout')} - raise serializers.ValidationError(code='payload', detail=errors) from exc - except requests.ConnectionError as exc: - logger.info( - "Attempted external asset upload resulted in connection error", - extra={ - 'url': url, - 'collection': self.collection, - 'exception': exc, - } - ) - raise serializers.ValidationError(code='payload', detail=unreachable_error) from exc - - def _validate_href_url(self, url): - self._validate_general_url_pattern(url) - self._validate_configured_url_pattern(url) - self._validate_reachability(url) - def _validate_href_field(self, attrs): """Only allow the href field if the collection allows for external assets @@ -495,7 +423,11 @@ def _validate_href_field(self, attrs): errors = {'href': _("Found read-only property in payload")} raise serializers.ValidationError(code="payload", detail=errors) - self._validate_href_url(attrs['file']) + try: + validate_href_url(attrs['file'], collection) + except CoreValidationError as e: + errors = {'href': e.message} + raise serializers.ValidationError(code='payload', detail=errors) def validate(self, attrs): self._validate_href_field(attrs) diff --git a/app/stac_api/validators.py b/app/stac_api/validators.py index 42a54dfb..df397680 100644 --- a/app/stac_api/validators.py +++ b/app/stac_api/validators.py @@ -4,13 +4,17 @@ from datetime import datetime import multihash +import requests from multihash.constants import CODE_HASHES from multihash.constants import HASH_CODES +from django.conf import settings from django.contrib.gis.gdal.error import GDALException from django.contrib.gis.geos import GEOSGeometry from django.contrib.gis.geos.error import GEOSException +from django.core import exceptions from django.core.exceptions import ValidationError +from django.core.validators import URLValidator from django.utils.translation import gettext_lazy as _ from stac_api.utils import fromisoformat @@ -555,3 +559,79 @@ def validate_content_encoding(value): params={'encoding': value, 'supported_encodings': ', '.join(supported_encodings)}, code='invalid' ) + + +def _validate_general_url_pattern(url): + try: + validator = URLValidator() + validator(url) + except exceptions.ValidationError as exc: + error = _('Invalid URL provided') + raise ValidationError(error) from exc + + +def _validate_configured_url_pattern(url, collection): + """Validate the URL against the whitelist""" + whitelist = collection.external_asset_whitelist + + for entry in whitelist: + if url.startswith(entry): + return True + + logger.info( + "Attempted external asset upload didn't match the whitelist", + extra={ + 'url': url, 'whitelist': whitelist, 'collection': collection + } + ) + + # none of the prefixes matches + error = _("Invalid URL provided. It doesn't match the collection whitelist") + raise ValidationError(error) + + +def _validate_reachability(url, collection): + unreachable_error = _('Provided URL is unreachable') + try: + response = requests.head(url, timeout=settings.EXTERNAL_URL_REACHABLE_TIMEOUT) + + if response.status_code > 400: + logger.info( + "Attempted external asset upload failed the reachability check", + extra={ + 'url': url, + 'collection': collection, + 'response': response, + } + ) + raise ValidationError(unreachable_error) + except requests.Timeout as exc: + logger.info( + "Attempted external asset upload resulted in a timeout", + extra={ + 'url': url, + 'collection': collection, + 'exception': exc, + 'timeout': settings.EXTERNAL_URL_REACHABLE_TIMEOUT + } + ) + error = _('Checking href URL resulted in timeout') + raise ValidationError(error) from exc + except requests.ConnectionError as exc: + logger.info( + "Attempted external asset upload resulted in connection error", + extra={ + 'url': url, + 'collection': collection, + 'exception': exc, + } + ) + raise ValidationError(unreachable_error) from exc + + +def validate_href_url(url, collection): + """Validate the href URL """ + + _validate_general_url_pattern(url) + _validate_configured_url_pattern(url, collection) + _validate_reachability(url, collection) diff --git a/app/tests/tests_10/test_external_assets_endpoint.py b/app/tests/tests_10/test_external_assets_endpoint.py index 84975f22..883fc7b0 100644 --- a/app/tests/tests_10/test_external_assets_endpoint.py +++ b/app/tests/tests_10/test_external_assets_endpoint.py @@ -1,12 +1,7 @@ -from parameterized import parameterized - from django.conf import settings from django.test import Client -from rest_framework import serializers - from stac_api.models import Asset -from stac_api.serializers import AssetSerializer from tests.tests_10.base_test import StacBaseTestCase from tests.tests_10.data_factory import Factory @@ -223,40 +218,3 @@ def test_delete_asset_with_external_url(self): ) self.assertStatusCode(200, response) - - @parameterized.expand([ - (['https://test-domain.test'], 'https://test-domain.test/collection/test.jpg', True), - (['https://test-domain'], 'https://test-domain.test/collection/test.jpg', True), - (['https://test-domaine', 'https://test-domain'], - 'https://test-domain.test/collection/test.jpg', - True), # trying to keep the formatting stable - (['https://test-domain.test/collection'], - 'https://test-domain.test/collection/test.jpg', - True), # trying to keep the formatting stable here - (['https://test-domain.tst', 'https://something-else.ch'], - 'https://test-domain.test/collection/test.jpg', - False), - (['http://test-domain.test'], 'https://test-domain.tst/collection/test.jpg', False), - (['https://test-domain.test'], 'http://test-domain.test/collection/test.jpg', False) - ]) - def test_create_external_asset_with_collection_pattern(self, patterns, href, result): - collection = self.collection - - # item = self.item - - class Obj: - - @property - def collection(self): - collection.external_asset_whitelist = patterns - collection.save() - - return collection - - if result: - # pylint: disable=W0212:protected-access - AssetSerializer._validate_configured_url_pattern(Obj(), href) - else: - with self.assertRaises(serializers.ValidationError): - # pylint: disable=W0212:protected-access - AssetSerializer._validate_configured_url_pattern(Obj(), href) diff --git a/app/tests/tests_10/test_validators.py b/app/tests/tests_10/test_validators.py index 3277f7e7..5393d312 100644 --- a/app/tests/tests_10/test_validators.py +++ b/app/tests/tests_10/test_validators.py @@ -1,12 +1,17 @@ +from parameterized import parameterized + from django.core.exceptions import ValidationError from django.test import TestCase from stac_api.validators import MediaType +from stac_api.validators import _validate_configured_url_pattern from stac_api.validators import get_media_type from stac_api.validators import normalize_and_validate_media_type from stac_api.validators import validate_content_encoding from stac_api.validators import validate_item_properties_datetimes_dependencies +from tests.tests_10.data_factory import Factory + class TestValidators(TestCase): @@ -87,3 +92,47 @@ def test_malformed_or_invalid_media_type_strings(self): for media_type_str in media_type_strings: self.assertRaises(ValidationError, normalize_and_validate_media_type, media_type_str) self.assertRaises(KeyError, get_media_type, media_type_str) + + +class TestExternalAssetValidators(TestCase): + + def setUp(self): # pylint: disable=invalid-name + self.factory = Factory() + self.collection = self.factory.create_collection_sample().model + + @parameterized.expand([ + (['https://test-domain.test'], 'https://test-domain.test/collection/test.jpg', True), + (['https://test-domain'], 'https://test-domain.test/collection/test.jpg', True), + (['https://test-domaine', 'https://test-domain'], + 'https://test-domain.test/collection/test.jpg', + True), # trying to keep the formatting stable + (['https://test-domain.test/collection'], + 'https://test-domain.test/collection/test.jpg', + True), # trying to keep the formatting stable here + (['https://test-domain.tst', 'https://something-else.ch'], + 'https://test-domain.test/collection/test.jpg', + False), + (['http://test-domain.test'], 'https://test-domain.tst/collection/test.jpg', False), + (['https://test-domain.test'], 'http://test-domain.test/collection/test.jpg', False) + ]) + def test_create_external_asset_with_collection_pattern(self, patterns, href, result): + collection = self.collection + + # item = self.item + + class Obj: + + @property + def collection(self): + collection.external_asset_whitelist = patterns + collection.save() + + return collection + + if result: + # pylint: disable=W0212:protected-access + _validate_configured_url_pattern(Obj(), href) + else: + with self.assertRaises(ValidationError): + # pylint: disable=W0212:protected-access + _validate_configured_url_pattern(Obj(), href)