diff --git a/bin/wheels/runtime/googleapis_common_protos-1.63.2-py2.py3-none-any.whl b/bin/wheels/runtime/googleapis_common_protos-1.64.0-py2.py3-none-any.whl similarity index 79% rename from bin/wheels/runtime/googleapis_common_protos-1.63.2-py2.py3-none-any.whl rename to bin/wheels/runtime/googleapis_common_protos-1.64.0-py2.py3-none-any.whl index 23b59dbdba..2a3e3a5882 100644 Binary files a/bin/wheels/runtime/googleapis_common_protos-1.63.2-py2.py3-none-any.whl and b/bin/wheels/runtime/googleapis_common_protos-1.64.0-py2.py3-none-any.whl differ diff --git a/common.mk b/common.mk index 190a1a8437..7c27305e84 100644 --- a/common.mk +++ b/common.mk @@ -28,23 +28,27 @@ check_python: check_venv echo -e "\nPATH lookup yields a 'pip' executable from outside the virtualenv\n"; \ false; \ fi - @if ! python -c "import sys; sys.exit(0 if '.'.join(map(str, sys.version_info[:3])) == '${azul_python_version}' else 1)"; then \ - echo -e "\nLooks like Python ${azul_python_version} is not installed or active in the current virtualenv\n"; \ + @if ! python -c "import sys, os; \ + p = lambda v: tuple(map(int, v.split('.'))); \ + v = os.environ['azul_python_version']; \ + sys.exit(0 if sys.version_info[:3] == p(v) else 1)"; then \ + echo -e "\nLooks like Python ${azul_python_version} is not installed\n"; \ false; \ fi - @if ! python -c "import sys; exec('try: import chalice\nexcept: sys.exit(1)\nelse: sys.exit(0)')"; then \ + @if ! python -c "import sys; \ + exec('try: import chalice\nexcept: sys.exit(1)\nelse: sys.exit(0)')"; then \ echo -e "\nLooks like some requirements are missing. Please run 'make requirements'\n"; \ false; \ fi @if ! python -c "import sys, wheel as w; \ - from pkg_resources import parse_version as p; \ - sys.exit(0 if p(w.__version__) >= p('0.32.3') else 1)"; then \ + p = lambda v: tuple(map(int, v.split('.'))); \ + sys.exit(0 if p(w.__version__) >= p('0.32.3') else 1)"; then \ echo -e "\nLooks like the `wheel` package is outdated or missing. See README for instructions on how to fix this.\n"; \ false; \ fi @if ! python -c "import sys; \ from chalice import chalice_version as v; \ - from pkg_resources import parse_version as p; \ + p = lambda v: tuple(map(int, v.split('.'))); \ sys.exit(0 if p(v) == p('1.30.0') else 1)"; then \ echo -e "\nLooks like chalice is out of date. Please run 'make requirements'\n"; \ false; \ @@ -74,10 +78,13 @@ check_docker: .PHONY: check_aws check_aws: check_python @if ! python -c "import os, sys, boto3 as b; \ - sys.exit(0 if os.environ.get('TRAVIS') == 'true' or \ - b.client('sts').get_caller_identity()['Account'] == os.environ['AZUL_AWS_ACCOUNT_ID'] else 1)"; then \ - echo -e "\nLooks like there is a mismatch between AZUL_AWS_ACCOUNT_ID and the currently active AWS credentials. \ - \nCheck the output from 'aws sts get-caller-identity' against the value of that environment variable.\n"; \ + expected = os.environ['AZUL_AWS_ACCOUNT_ID']; \ + actual = b.client('sts').get_caller_identity()['Account']; \ + sys.exit(0 if actual == expected else 1)"; then \ + echo Looks like there is a mismatch between AZUL_AWS_ACCOUNT_ID \ + and the currently active AWS credentials. ; \ + echo Check the output from \'aws sts get-caller-identity\' against the \ + value of that environment variable. ; \ false; \ fi diff --git a/requirements.all.txt b/requirements.all.txt index 23f3b9581d..13562935c7 100644 --- a/requirements.all.txt +++ b/requirements.all.txt @@ -10,7 +10,7 @@ blinker==1.8.2 boto3==1.28.63 boto3-stubs==1.28.63 botocore==1.31.63 -botocore-stubs==1.35.5 +botocore-stubs==1.35.6 brotli==1.1.0 cachetools==5.5.0 certifi==2024.7.4 @@ -49,7 +49,7 @@ google-cloud-core==2.4.1 google-cloud-storage==2.12.0 google-crc32c==1.5.0 google-resumable-media==2.7.2 -googleapis-common-protos==1.63.2 +googleapis-common-protos==1.64.0 greenlet==3.0.3 grpcio==1.66.0 grpcio-status==1.62.3 @@ -103,7 +103,7 @@ pygithub==1.56 pyjwt==2.9.0 pynacl==1.5.0 pyopenssl==24.2.1 -pyparsing==3.1.2 +pyparsing==3.1.4 pyrsistent==0.20.0 python-dateutil==2.9.0.post0 python-dxf==11.4.0 @@ -142,4 +142,4 @@ wrapt==1.16.0 www-authenticate==0.9.2 xmltodict==0.13.0 zope.event==5.0 -zope.interface==7.0.1 +zope.interface==7.0.2 diff --git a/requirements.dev.trans.txt b/requirements.dev.trans.txt index ddc62c0638..8b667226c9 100644 --- a/requirements.dev.trans.txt +++ b/requirements.dev.trans.txt @@ -1,6 +1,6 @@ blessed==1.20.0 blinker==1.8.2 -botocore-stubs==1.35.5 +botocore-stubs==1.35.6 brotli==1.1.0 click==8.1.7 colorama==0.4.4 @@ -41,7 +41,7 @@ pycodestyle==2.9.1 pyflakes==2.5.0 pyjwt==2.9.0 pynacl==1.5.0 -pyparsing==3.1.2 +pyparsing==3.1.4 pyrsistent==0.20.0 python-editor==1.0.4 pyzmq==26.2.0 @@ -59,4 +59,4 @@ wcwidth==0.2.13 www-authenticate==0.9.2 xmltodict==0.13.0 zope.event==5.0 -zope.interface==7.0.1 +zope.interface==7.0.2 diff --git a/requirements.trans.txt b/requirements.trans.txt index f82796d287..82a1e32b76 100644 --- a/requirements.trans.txt +++ b/requirements.trans.txt @@ -8,7 +8,7 @@ cryptography==43.0.0 google-cloud-core==2.4.1 google-crc32c==1.5.0 google-resumable-media==2.7.2 -googleapis-common-protos==1.63.2 +googleapis-common-protos==1.64.0 grpcio==1.66.0 grpcio-status==1.62.3 http_sfv==0.9.9 diff --git a/src/azul/service/async_manifest_service.py b/src/azul/service/async_manifest_service.py index 53ca8fe0e2..b35fdc67ac 100644 --- a/src/azul/service/async_manifest_service.py +++ b/src/azul/service/async_manifest_service.py @@ -120,32 +120,37 @@ def start_generation(self, execution_id: bytes, input: JSON) -> Token: execution_arn = self.execution_arn(execution_name) # The input contains the verbatim manifest key as JSON while the ARN # contains the encoded hash of the key so this log line is useful for - # associating the hash with the key for diagnostic purposes + # associating the hash with the key for diagnostic purposes. log.info('Starting execution %r for input %r', execution_arn, input) token = Token.first(execution_id) - input = json.dumps(input) try: # If there already is an execution of the given name, and if that # execution is still ongoing and was given the same input as what we - # pass here, `start_execution` will succeed idempotently + # pass here, `start_execution` will succeed idempotently. execution = self._sfn.start_execution(stateMachineArn=self.machine_arn, name=execution_name, - input=input) + input=json.dumps(input)) except self._sfn.exceptions.ExecutionAlreadyExists: # This exception indicates that there is already an execution with # the given name but that it has ended, or that its input differs - # from what we were passing now. The latter case is unexpected and - # therefore constitues an error. In the former case we return the - # token so that the client has to make another request to actually - # obtain the resulting manifest. Strictly speaking, we could return - # the manifest here, but it keeps the control flow simpler. This - # benevolent race is rare enough to not worry about optimizing. + # from what we were passing just now. The latter case is unexpected + # because any part of the input that affects the output is covered + # in the manifest hash and therefore the execution name. Any part of + # the input not affecting the output is constant and can only change + # with the source code which would have resulted in a different + # execution name. + # + # In the former case we return the token so that the client has to + # make another request to actually obtain the resulting manifest. + # Strictly speaking, we could return the manifest here, but it keeps + # the control flow simpler. This benevolent race is not probable + # enough to warrant an optimization. execution = self._sfn.describe_execution(executionArn=execution_arn) - if execution['input'] != input: - raise InvalidGeneration(token) - else: + if input == json.loads(execution['input']): log.info('A completed execution %r already exists', execution_arn) return token + else: + raise InvalidGeneration(token) else: assert execution_arn == execution['executionArn'], execution log.info('Started execution %r or it was already running', execution_arn) diff --git a/src/azul/service/manifest_controller.py b/src/azul/service/manifest_controller.py index 9b22168400..054bad22c3 100644 --- a/src/azul/service/manifest_controller.py +++ b/src/azul/service/manifest_controller.py @@ -149,11 +149,11 @@ def get_manifest_async(self, } # Manifest keys for catalogs with long names would be too # long to be used directly as state machine execution names. - execution_key = manifest_key.hash + execution_id = manifest_key.hash # ManifestGenerationState is also JSON but there is no way # to express that since TypedDict rejects a co-parent class. input: JSON = cast(JSON, state) - token = self.async_service.start_generation(execution_key, input) + token = self.async_service.start_generation(execution_id, input) else: manifest_key = manifest.manifest_key else: diff --git a/test/service/test_manifest_async.py b/test/service/test_manifest_async.py index ae8052e77f..e328310a62 100644 --- a/test/service/test_manifest_async.py +++ b/test/service/test_manifest_async.py @@ -154,22 +154,32 @@ def lambda_name(cls) -> str: @mock.patch.object(AsyncManifestService, '_sfn') @mock.patch.object(ManifestService, 'get_manifest') @mock.patch.object(ManifestService, 'get_cached_manifest') - def test(self, get_cached_manifest, get_manifest, _sfn): + @mock.patch.object(ManifestService, 'verify_manifest_key') + @mock.patch.object(ManifestService, 'get_cached_manifest_with_key') + def test(self, + get_cached_manifest_with_key, + verify_manifest_key, + get_cached_manifest, + get_manifest, + _sfn): with responses.RequestsMock() as helper: helper.add_passthru(str(self.base_url)) for fetch in (True, False): with self.subTest(fetch=fetch): format = ManifestFormat.compact - filters = Filters(explicit={'organ': {'is': ['lymph node']}}, - source_ids={self.source.id}) + filters = {'organ': {'is': ['lymph node']}, 'fileFormat': {'is': ['txt']}} + filters = Filters(explicit=filters, source_ids={self.source.id}) params = { 'catalog': self.catalog, 'format': format.value, 'filters': json.dumps(filters.explicit) } path = '/manifest/files' - object_url = 'https://url.to.manifest?foo=bar' - file_name = 'some_file_name' + + initial_url = self.base_url.set(path=path, args=params) + if fetch: + initial_url.path.segments.insert(0, 'fetch') + manifest_key = ManifestKey(catalog=self.catalog, format=format, manifest_hash=UUID('d2b0ce3c-46f0-57fe-b9d4-2e38d8934fd4'), @@ -180,14 +190,14 @@ def test(self, get_cached_manifest, get_manifest, _sfn): manifest_url = self.base_url.set(path=path) manifest_url.path.segments.append(signed_manifest_key.encode()) + + object_url = 'https://url.to.manifest?foo=bar' + file_name = 'some_file_name' manifest = Manifest(location=object_url, was_cached=False, format=format, manifest_key=manifest_key, file_name=file_name) - url = self.base_url.set(path=path, args=params) - if fetch: - url.path.segments.insert(0, 'fetch') partitions = ( ManifestPartition(index=0, @@ -209,6 +219,9 @@ def test(self, get_cached_manifest, get_manifest, _sfn): is_last_page=False, search_after=('foo', 'doc#bar')) ) + input: ManifestGenerationState = dict(filters=filters.to_json(), + manifest_key=manifest_key.to_json(), + partition=partitions[0].to_json()) service: AsyncManifestService service = self.app_module.app.manifest_controller.async_service execution_id = manifest_key.hash @@ -219,6 +232,7 @@ def test(self, get_cached_manifest, get_manifest, _sfn): 'executionArn': execution_arn, 'startDate': 123 } + url = initial_url for i, expected_status in enumerate(3 * [301] + [302]): response = requests.request(method='PUT' if i == 0 else 'GET', url=str(url), @@ -235,17 +249,16 @@ def test(self, get_cached_manifest, get_manifest, _sfn): self.assertGreaterEqual(int(headers['Retry-After']), 0) url = furl(headers['Location']) if i == 0: - state: ManifestGenerationState = dict(filters=filters.to_json(), - manifest_key=manifest_key.to_json(), - partition=partitions[0].to_json()) + state: ManifestGenerationState = input _sfn.start_execution.assert_called_once_with( stateMachineArn=machine_arn, name=execution_name, - input=json.dumps(state) + input=json.dumps(input) ) _sfn.describe_execution.assert_not_called() _sfn.reset_mock() _sfn.describe_execution.return_value = {'status': 'RUNNING'} + token_url = url elif i == 1: get_manifest.return_value = partitions[1] state = self.app_module.generate_manifest(state, None) @@ -271,6 +284,7 @@ def test(self, get_cached_manifest, get_manifest, _sfn): _sfn.reset_mock() _sfn.describe_execution.return_value = { 'status': 'SUCCEEDED', + 'input': json.dumps(input), 'output': json.dumps( self.app_module.generate_manifest(state, None) ) @@ -290,34 +304,53 @@ def test(self, get_cached_manifest, get_manifest, _sfn): expected_url = str(manifest_url) if expect_redirect else object_url self.assertEqual(expected_url, str(url)) _sfn.reset_mock() - mock_effects = [ - manifest, - CachedManifestNotFound(manifest_key) - ] - with ( - mock.patch.object(ManifestService, - 'get_cached_manifest_with_key', - side_effect=mock_effects), - mock.patch.object(ManifestService, - 'verify_manifest_key', - return_value=manifest_key) - ): - for mock_effect in mock_effects: - with self.subTest(mock_effect=mock_effect): - assert signed_manifest_key.encode() == manifest_url.path.segments[-1] - response = requests.get(str(manifest_url), allow_redirects=False) - if isinstance(mock_effect, Manifest): - self.assertEqual(302, response.status_code) - self.assertEqual(object_url, response.headers['Location']) - elif isinstance(mock_effect, CachedManifestNotFound): - self.assertEqual(410, response.status_code) - expected_response = { - 'Code': 'GoneError', - 'Message': 'The requested manifest has expired, please request a new one' - } - self.assertEqual(expected_response, response.json()) - else: - assert False, mock_effect + + # Repeat the initial request while continuing to mock the + # absence of the manifest. The SFN execution is complete so + # this simulates an expired manifest. To satisfy the + # request, a new generation has to be started, and the + # response should be 301 redirect for the new generation. + exception = self._mock_sfn_exception(_sfn, + operation_name='StartExecution', + error_code='ExecutionAlreadyExists') + _sfn.start_execution.side_effect = exception + + # Introduce an insignificant difference in the SFN input by + # reordering the `filters` dictionary entries. The repeated + # request should be considered valid and matching the completed + # step function execution. + url = initial_url.copy() + filters = json.loads(url.args['filters']) + url.args['filters'] = json.dumps(dict(reversed(filters.items()))) + + response = requests.put(url=str(url), allow_redirects=False) + _sfn.reset_mock(side_effect=True) + if fetch: + self.assertEqual(200, response.status_code) + self.assertEqual(301, response.json()['Status']) + else: + self.assertEqual(301, response.status_code) + # FIXME: 404 from S3 when re-requesting manifest after it expired + # https://github.com/DataBiosphere/azul/issues/6441 + if False: + self.assertNotEqual(token_url, response.json()['Location']) + + assert signed_manifest_key.encode() == manifest_url.path.segments[-1] + assert verify_manifest_key.not_called + verify_manifest_key.return_value = manifest_key + assert get_cached_manifest_with_key.not_called + get_cached_manifest_with_key.return_value = manifest + response = requests.get(str(manifest_url), allow_redirects=False) + self.assertEqual(302, response.status_code) + self.assertEqual(object_url, response.headers['Location']) + get_cached_manifest_with_key.side_effect = CachedManifestNotFound(manifest_key) + response = requests.get(str(manifest_url), allow_redirects=False) + self.assertEqual(410, response.status_code) + expected_response = { + 'Code': 'GoneError', + 'Message': 'The requested manifest has expired, please request a new one' + } + self.assertEqual(expected_response, response.json()) token = Token.first(execution_id).encode() @@ -328,19 +361,29 @@ def _test(self, *, expected_status, token=token): @contextmanager def _mock_error(self, error_code: str) -> ContextManager: - exception_cls = type(error_code, (ClientError,), {}) with patch.object(AsyncManifestService, '_sfn') as _sfn: - setattr(_sfn.exceptions, error_code, exception_cls) - error_response = { - 'Error': { - 'Code': error_code - } - } - exception = exception_cls(operation_name='DescribeExecution', - error_response=error_response) + exception = self._mock_sfn_exception(_sfn, + operation_name='DescribeExecution', + error_code=error_code) _sfn.describe_execution.side_effect = exception yield + def _mock_sfn_exception(self, + _sfn: mock.MagicMock, + operation_name: str, + error_code: str + ) -> Exception: + exception_cls = type(error_code, (ClientError,), {}) + setattr(_sfn.exceptions, error_code, exception_cls) + error_response = { + 'Error': { + 'Code': error_code + } + } + exception = exception_cls(operation_name=operation_name, + error_response=error_response) + return exception + def test_execution_not_found(self): """ Manifest status check should raise a BadRequestError (400 status code)