diff --git a/infrastructure/constructs/processing.py b/infrastructure/constructs/processing.py index 4ddcf0085..4662432ca 100644 --- a/infrastructure/constructs/processing.py +++ b/infrastructure/constructs/processing.py @@ -441,7 +441,9 @@ def __init__( # STATE MACHINE dataset_version_creation_definition = ( check_stac_metadata_task.add_catch( - errors=[Errors.TASKS_FAILED], handler=validation_summary_task + errors=[Errors.TASKS_FAILED], + handler=validation_summary_task, + result_path="$.error-info", ) .next(content_iterator_task) .next( diff --git a/tests/test_processing_stack.py b/tests/test_processing_stack.py index b94d7d91e..3530117fe 100644 --- a/tests/test_processing_stack.py +++ b/tests/test_processing_stack.py @@ -9,6 +9,7 @@ import smart_open from mypy_boto3_lambda import LambdaClient +from mypy_boto3_lambda.type_defs import InvocationResponseTypeDef from mypy_boto3_s3 import S3Client from mypy_boto3_s3control import S3ControlClient from mypy_boto3_ssm import SSMClient @@ -187,35 +188,14 @@ def should_successfully_run_dataset_version_creation_process_with_single_asset( # When try: - - dataset_response = lambda_client.invoke( - FunctionName=Resource.DATASETS_ENDPOINT_FUNCTION_NAME.resource_name, - Payload=dumps( - { - HTTP_METHOD_KEY: "POST", - BODY_KEY: { - DATASET_TITLE_KEY: dataset_title, - DESCRIPTION_KEY: any_dataset_description(), - }, - } - ).encode(), - ) + dataset_response = invoke_dataset_lambda_function(lambda_client, dataset_title) dataset_payload = load(dataset_response["Payload"]) dataset_id = dataset_payload[BODY_KEY][DATASET_ID_SHORT_KEY] - dataset_versions_response = lambda_client.invoke( - FunctionName=Resource.DATASET_VERSIONS_ENDPOINT_FUNCTION_NAME.resource_name, - Payload=dumps( - { - HTTP_METHOD_KEY: "POST", - BODY_KEY: { - DATASET_ID_SHORT_KEY: dataset_id, - METADATA_URL_KEY: root_metadata_file.url, - S3_ROLE_ARN_KEY: get_s3_role_arn(), - }, - } - ).encode(), + dataset_versions_response = invoke_dataset_version_lambda_function( + lambda_client, dataset_id, root_metadata_file.url ) + dataset_versions_payload = load(dataset_versions_response["Payload"]) with subtests.test(msg="Dataset Versions endpoint returns success"): @@ -811,6 +791,136 @@ def should_successfully_run_dataset_version_creation_process_and_again_with_part ) +@mark.infrastructure +def should_end_step_function_successfully_when_non_collection_or_catalog_submitted( + lambda_client: LambdaClient, + step_functions_client: SFNClient, + subtests: SubTests, +) -> None: + + # pylint:disable=too-many-locals + key_prefix = any_safe_file_path() + dataset_title = any_dataset_title() + first_asset_contents = any_file_contents() + first_asset_name = any_asset_name() + first_asset_hex_digest = sha256_hex_digest_to_multihash( + sha256(first_asset_contents).hexdigest() + ) + first_asset_created = any_past_datetime_string() + first_asset_updated = any_past_datetime_string() + item_metadata_filename = any_safe_filename() + + first_asset_filename = any_safe_filename() + metadata_url_prefix = ( + f"{S3_URL_PREFIX}{Resource.STAGING_BUCKET_NAME.resource_name}/{key_prefix}" + ) + collection_metadata_filename = any_safe_filename() + catalog_metadata_filename = any_safe_filename() + + collection_metadata_url = f"{metadata_url_prefix}/{collection_metadata_filename}" + catalog_metadata_url = f"{metadata_url_prefix}/{catalog_metadata_filename}" + item_metadata_url = f"{metadata_url_prefix}/{item_metadata_filename}" + + with S3Object( + file_object=BytesIO(initial_bytes=first_asset_contents), + bucket_name=Resource.STAGING_BUCKET_NAME.resource_name, + key=f"{key_prefix}/{first_asset_filename}", + ) as second_asset_s3_object, S3Object( + file_object=json_dict_to_file_object( + { + **deepcopy(MINIMAL_VALID_STAC_ITEM_OBJECT), + STAC_ASSETS_KEY: { + first_asset_name: { + LINZ_STAC_CREATED_KEY: first_asset_created, + LINZ_STAC_UPDATED_KEY: first_asset_updated, + STAC_HREF_KEY: second_asset_s3_object.url, + STAC_FILE_CHECKSUM_KEY: first_asset_hex_digest, + } + }, + STAC_LINKS_KEY: [ + { + STAC_HREF_KEY: catalog_metadata_url, + STAC_REL_KEY: STAC_REL_ROOT, + STAC_TYPE_KEY: STAC_MEDIA_TYPE_JSON, + }, + { + STAC_HREF_KEY: collection_metadata_url, + STAC_REL_KEY: STAC_REL_PARENT, + STAC_TYPE_KEY: STAC_MEDIA_TYPE_JSON, + }, + { + STAC_HREF_KEY: item_metadata_url, + STAC_REL_KEY: STAC_REL_SELF, + STAC_TYPE_KEY: STAC_MEDIA_TYPE_JSON, + }, + ], + } + ), + bucket_name=Resource.STAGING_BUCKET_NAME.resource_name, + key=f"{key_prefix}/{item_metadata_filename}", + ) as s3_metadata_file: + dataset_response = invoke_dataset_lambda_function(lambda_client, dataset_title) + dataset_payload = load(dataset_response["Payload"]) + dataset_id = dataset_payload[BODY_KEY][DATASET_ID_SHORT_KEY] + + # When creating a dataset version + dataset_version_creation_response = invoke_dataset_version_lambda_function( + lambda_client, dataset_id, s3_metadata_file.url + ) + response_payload = load(dataset_version_creation_response["Payload"]) + with subtests.test(msg="Dataset Versions endpoint status code"): + assert response_payload.get(STATUS_CODE_KEY) == HTTPStatus.CREATED, response_payload + dataset_versions_body = response_payload[BODY_KEY] + + with subtests.test(msg="Step function result"): + # Then poll for State Machine State + state_machine_arn = dataset_versions_body[EXECUTION_ARN_KEY] + while ( + execution := step_functions_client.describe_execution( + executionArn=state_machine_arn + ) + )["status"] == "RUNNING": + + sleep(5) # pragma: no cover + + assert execution["status"] == "SUCCEEDED", execution + + +def invoke_dataset_version_lambda_function( + lambda_client: LambdaClient, dataset_id: str, metadata_url: str +) -> InvocationResponseTypeDef: + return lambda_client.invoke( + FunctionName=Resource.DATASET_VERSIONS_ENDPOINT_FUNCTION_NAME.resource_name, + Payload=dumps( + { + HTTP_METHOD_KEY: "POST", + BODY_KEY: { + DATASET_ID_SHORT_KEY: dataset_id, + METADATA_URL_KEY: metadata_url, + S3_ROLE_ARN_KEY: get_s3_role_arn(), + }, + } + ).encode(), + ) + + +def invoke_dataset_lambda_function( + lambda_client: LambdaClient, dataset_title: str +) -> InvocationResponseTypeDef: + return lambda_client.invoke( + FunctionName=Resource.DATASETS_ENDPOINT_FUNCTION_NAME.resource_name, + Payload=dumps( + { + HTTP_METHOD_KEY: "POST", + BODY_KEY: { + DATASET_TITLE_KEY: dataset_title, + DESCRIPTION_KEY: any_dataset_description(), + }, + } + ).encode(), + ) + + @mark.infrastructure def should_not_copy_files_when_there_is_a_checksum_mismatch( lambda_client: LambdaClient, @@ -849,34 +959,13 @@ def should_not_copy_files_when_there_is_a_checksum_mismatch( key=f"{key_prefix}/{metadata_filename}", ) as s3_metadata_file: - dataset_response = lambda_client.invoke( - FunctionName=Resource.DATASETS_ENDPOINT_FUNCTION_NAME.resource_name, - Payload=dumps( - { - HTTP_METHOD_KEY: "POST", - BODY_KEY: { - DATASET_TITLE_KEY: dataset_title, - DESCRIPTION_KEY: any_dataset_description(), - }, - } - ).encode(), - ) + dataset_response = invoke_dataset_lambda_function(lambda_client, dataset_title) dataset_payload = load(dataset_response["Payload"]) dataset_id = dataset_payload[BODY_KEY][DATASET_ID_SHORT_KEY] # When creating a dataset version - dataset_version_creation_response = lambda_client.invoke( - FunctionName=Resource.DATASET_VERSIONS_ENDPOINT_FUNCTION_NAME.resource_name, - Payload=dumps( - { - HTTP_METHOD_KEY: "POST", - BODY_KEY: { - DATASET_ID_SHORT_KEY: dataset_id, - METADATA_URL_KEY: s3_metadata_file.url, - S3_ROLE_ARN_KEY: get_s3_role_arn(), - }, - } - ).encode(), + dataset_version_creation_response = invoke_dataset_version_lambda_function( + lambda_client, dataset_id, s3_metadata_file.url ) response_payload = load(dataset_version_creation_response["Payload"])