Skip to content
This repository has been archived by the owner on Nov 23, 2023. It is now read-only.

Commit

Permalink
fix: Allow step function to keep the payload when exception thrown (#…
Browse files Browse the repository at this point in the history
…2126)

* fix: stop step func throws payload when exp

* fix: test to validate step success when wrong item file submitted

* fix: change parameters orders

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
  • Loading branch information
AmrouEmad and kodiakhq[bot] authored Oct 11, 2022
1 parent 28290fb commit 6a2bc9b
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 50 deletions.
4 changes: 3 additions & 1 deletion infrastructure/constructs/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,9 @@ def __init__(
# STATE MACHINE
dataset_version_creation_definition = (
check_stac_metadata_task.add_catch(
errors=[Errors.TASKS_FAILED], handler=validation_summary_task
errors=[Errors.TASKS_FAILED],
handler=validation_summary_task,
result_path="$.error-info",
)
.next(content_iterator_task)
.next(
Expand Down
187 changes: 138 additions & 49 deletions tests/test_processing_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import smart_open
from mypy_boto3_lambda import LambdaClient
from mypy_boto3_lambda.type_defs import InvocationResponseTypeDef
from mypy_boto3_s3 import S3Client
from mypy_boto3_s3control import S3ControlClient
from mypy_boto3_ssm import SSMClient
Expand Down Expand Up @@ -187,35 +188,14 @@ def should_successfully_run_dataset_version_creation_process_with_single_asset(

# When
try:

dataset_response = lambda_client.invoke(
FunctionName=Resource.DATASETS_ENDPOINT_FUNCTION_NAME.resource_name,
Payload=dumps(
{
HTTP_METHOD_KEY: "POST",
BODY_KEY: {
DATASET_TITLE_KEY: dataset_title,
DESCRIPTION_KEY: any_dataset_description(),
},
}
).encode(),
)
dataset_response = invoke_dataset_lambda_function(lambda_client, dataset_title)
dataset_payload = load(dataset_response["Payload"])
dataset_id = dataset_payload[BODY_KEY][DATASET_ID_SHORT_KEY]

dataset_versions_response = lambda_client.invoke(
FunctionName=Resource.DATASET_VERSIONS_ENDPOINT_FUNCTION_NAME.resource_name,
Payload=dumps(
{
HTTP_METHOD_KEY: "POST",
BODY_KEY: {
DATASET_ID_SHORT_KEY: dataset_id,
METADATA_URL_KEY: root_metadata_file.url,
S3_ROLE_ARN_KEY: get_s3_role_arn(),
},
}
).encode(),
dataset_versions_response = invoke_dataset_version_lambda_function(
lambda_client, dataset_id, root_metadata_file.url
)

dataset_versions_payload = load(dataset_versions_response["Payload"])

with subtests.test(msg="Dataset Versions endpoint returns success"):
Expand Down Expand Up @@ -811,6 +791,136 @@ def should_successfully_run_dataset_version_creation_process_and_again_with_part
)


@mark.infrastructure
def should_end_step_function_successfully_when_non_collection_or_catalog_submitted(
lambda_client: LambdaClient,
step_functions_client: SFNClient,
subtests: SubTests,
) -> None:

# pylint:disable=too-many-locals
key_prefix = any_safe_file_path()
dataset_title = any_dataset_title()
first_asset_contents = any_file_contents()
first_asset_name = any_asset_name()
first_asset_hex_digest = sha256_hex_digest_to_multihash(
sha256(first_asset_contents).hexdigest()
)
first_asset_created = any_past_datetime_string()
first_asset_updated = any_past_datetime_string()
item_metadata_filename = any_safe_filename()

first_asset_filename = any_safe_filename()
metadata_url_prefix = (
f"{S3_URL_PREFIX}{Resource.STAGING_BUCKET_NAME.resource_name}/{key_prefix}"
)
collection_metadata_filename = any_safe_filename()
catalog_metadata_filename = any_safe_filename()

collection_metadata_url = f"{metadata_url_prefix}/{collection_metadata_filename}"
catalog_metadata_url = f"{metadata_url_prefix}/{catalog_metadata_filename}"
item_metadata_url = f"{metadata_url_prefix}/{item_metadata_filename}"

with S3Object(
file_object=BytesIO(initial_bytes=first_asset_contents),
bucket_name=Resource.STAGING_BUCKET_NAME.resource_name,
key=f"{key_prefix}/{first_asset_filename}",
) as second_asset_s3_object, S3Object(
file_object=json_dict_to_file_object(
{
**deepcopy(MINIMAL_VALID_STAC_ITEM_OBJECT),
STAC_ASSETS_KEY: {
first_asset_name: {
LINZ_STAC_CREATED_KEY: first_asset_created,
LINZ_STAC_UPDATED_KEY: first_asset_updated,
STAC_HREF_KEY: second_asset_s3_object.url,
STAC_FILE_CHECKSUM_KEY: first_asset_hex_digest,
}
},
STAC_LINKS_KEY: [
{
STAC_HREF_KEY: catalog_metadata_url,
STAC_REL_KEY: STAC_REL_ROOT,
STAC_TYPE_KEY: STAC_MEDIA_TYPE_JSON,
},
{
STAC_HREF_KEY: collection_metadata_url,
STAC_REL_KEY: STAC_REL_PARENT,
STAC_TYPE_KEY: STAC_MEDIA_TYPE_JSON,
},
{
STAC_HREF_KEY: item_metadata_url,
STAC_REL_KEY: STAC_REL_SELF,
STAC_TYPE_KEY: STAC_MEDIA_TYPE_JSON,
},
],
}
),
bucket_name=Resource.STAGING_BUCKET_NAME.resource_name,
key=f"{key_prefix}/{item_metadata_filename}",
) as s3_metadata_file:
dataset_response = invoke_dataset_lambda_function(lambda_client, dataset_title)
dataset_payload = load(dataset_response["Payload"])
dataset_id = dataset_payload[BODY_KEY][DATASET_ID_SHORT_KEY]

# When creating a dataset version
dataset_version_creation_response = invoke_dataset_version_lambda_function(
lambda_client, dataset_id, s3_metadata_file.url
)
response_payload = load(dataset_version_creation_response["Payload"])
with subtests.test(msg="Dataset Versions endpoint status code"):
assert response_payload.get(STATUS_CODE_KEY) == HTTPStatus.CREATED, response_payload
dataset_versions_body = response_payload[BODY_KEY]

with subtests.test(msg="Step function result"):
# Then poll for State Machine State
state_machine_arn = dataset_versions_body[EXECUTION_ARN_KEY]
while (
execution := step_functions_client.describe_execution(
executionArn=state_machine_arn
)
)["status"] == "RUNNING":

sleep(5) # pragma: no cover

assert execution["status"] == "SUCCEEDED", execution


def invoke_dataset_version_lambda_function(
lambda_client: LambdaClient, dataset_id: str, metadata_url: str
) -> InvocationResponseTypeDef:
return lambda_client.invoke(
FunctionName=Resource.DATASET_VERSIONS_ENDPOINT_FUNCTION_NAME.resource_name,
Payload=dumps(
{
HTTP_METHOD_KEY: "POST",
BODY_KEY: {
DATASET_ID_SHORT_KEY: dataset_id,
METADATA_URL_KEY: metadata_url,
S3_ROLE_ARN_KEY: get_s3_role_arn(),
},
}
).encode(),
)


def invoke_dataset_lambda_function(
lambda_client: LambdaClient, dataset_title: str
) -> InvocationResponseTypeDef:
return lambda_client.invoke(
FunctionName=Resource.DATASETS_ENDPOINT_FUNCTION_NAME.resource_name,
Payload=dumps(
{
HTTP_METHOD_KEY: "POST",
BODY_KEY: {
DATASET_TITLE_KEY: dataset_title,
DESCRIPTION_KEY: any_dataset_description(),
},
}
).encode(),
)


@mark.infrastructure
def should_not_copy_files_when_there_is_a_checksum_mismatch(
lambda_client: LambdaClient,
Expand Down Expand Up @@ -849,34 +959,13 @@ def should_not_copy_files_when_there_is_a_checksum_mismatch(
key=f"{key_prefix}/{metadata_filename}",
) as s3_metadata_file:

dataset_response = lambda_client.invoke(
FunctionName=Resource.DATASETS_ENDPOINT_FUNCTION_NAME.resource_name,
Payload=dumps(
{
HTTP_METHOD_KEY: "POST",
BODY_KEY: {
DATASET_TITLE_KEY: dataset_title,
DESCRIPTION_KEY: any_dataset_description(),
},
}
).encode(),
)
dataset_response = invoke_dataset_lambda_function(lambda_client, dataset_title)
dataset_payload = load(dataset_response["Payload"])
dataset_id = dataset_payload[BODY_KEY][DATASET_ID_SHORT_KEY]

# When creating a dataset version
dataset_version_creation_response = lambda_client.invoke(
FunctionName=Resource.DATASET_VERSIONS_ENDPOINT_FUNCTION_NAME.resource_name,
Payload=dumps(
{
HTTP_METHOD_KEY: "POST",
BODY_KEY: {
DATASET_ID_SHORT_KEY: dataset_id,
METADATA_URL_KEY: s3_metadata_file.url,
S3_ROLE_ARN_KEY: get_s3_role_arn(),
},
}
).encode(),
dataset_version_creation_response = invoke_dataset_version_lambda_function(
lambda_client, dataset_id, s3_metadata_file.url
)

response_payload = load(dataset_version_creation_response["Payload"])
Expand Down

0 comments on commit 6a2bc9b

Please sign in to comment.