Skip to content
This repository has been archived by the owner on Dec 5, 2023. It is now read-only.

Commit

Permalink
add aborted state closing #44 (#45)
Browse files Browse the repository at this point in the history
  • Loading branch information
jkeifer authored Feb 17, 2022
1 parent b01f2e2 commit 64f7cd1
Show file tree
Hide file tree
Showing 5 changed files with 279 additions and 190 deletions.
13 changes: 11 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
## [Unreleased]


## [v0.7.0] - 2022-02-17

### Added

* Support for an `ABORTED` workflow state ([#44])


## [v0.6.2] - 2022-02-07

### Fixed
Expand Down Expand Up @@ -211,7 +218,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

Initial Release

[Unreleased]: https://github.com/cirrus-geo/cirrus-lib/compare/v0.6.2...main
[Unreleased]: https://github.com/cirrus-geo/cirrus-lib/compare/v0.7.0...main
[v0.7.0]: https://github.com/cirrus-geo/cirrus-lib/compare/v0.6.2...v0.7.0
[v0.6.2]: https://github.com/cirrus-geo/cirrus-lib/compare/v0.6.1...v0.6.2
[v0.6.1]: https://github.com/cirrus-geo/cirrus-lib/compare/v0.6.0...v0.6.1
[v0.6.0]: https://github.com/cirrus-geo/cirrus-lib/compare/v0.5.1...v0.6.0
Expand Down Expand Up @@ -244,6 +252,7 @@ Initial Release
[#38]: https://github.com/cirrus-geo/cirrus-lib/pull/38
[#41]: https://github.com/cirrus-geo/cirrus-lib/pull/41

[#44]: https://github.com/cirrus-geo/cirrus-lib/issues/41

[c919fad]: https://github.com/cirrus-geo/cirrus-lib/commit/c919fadb83bb4f5cdfd082d482e25975ce12aa2c
[02ff5e3]: https://github.com/cirrus-geo/cirrus-lib/commit/02ff5e33412026b1fedda97727eef66715a27492

4 changes: 2 additions & 2 deletions src/cirrus/lib/process_payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ def from_statedb(cls, collections, state, since: str=None, index: str='input_sta
Args:
collections (str): String of collections (input or output depending on `index`)
state (str): The state (QUEUED, PROCESSING, COMPLETED, FAILED, INVALID) of StateDB Items to get
state (str): The state (QUEUED, PROCESSING, COMPLETED, FAILED, INVALID, ABORTED) of StateDB Items to get
since (str, optional): Get Items since this duration ago (e.g., 10m, 8h, 1w). Defaults to None.
index (str, optional): 'input_state' or 'output_state' Defaults to 'input_state'.
limit ([type], optional): Max number of Items to return. Defaults to None.
Expand Down Expand Up @@ -496,7 +496,7 @@ def process(self, replace=False):
# continue
if payload['id'] in payload_ids:
logger.warning(f"Dropping duplicated payload {payload['id']}")
elif state in ['FAILED', ''] or _replace:
elif state in ['FAILED', 'ABORTED', ''] or _replace:
payload_id = payload()
if payload_id is not None:
payload_ids.append(payload_id)
Expand Down
35 changes: 32 additions & 3 deletions src/cirrus/lib/statedb.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# envvars
PAYLOAD_BUCKET = os.getenv('CIRRUS_PAYLOAD_BUCKET')

STATES = ['PROCESSING', 'COMPLETED', 'FAILED', 'INVALID']
STATES = ['PROCESSING', 'COMPLETED', 'FAILED', 'INVALID', 'ABORTED']

# logging
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -123,7 +123,7 @@ def get_items_page(self, collections_workflow: str,
Args:
collections_workflow (str): /-separated list of input collections_workflow
state (Optional[str], optional): State of Items to get (PROCESSING, COMPLETED, FAILED, INVALID)
state (Optional[str], optional): State of Items to get (PROCESSING, COMPLETED, FAILED, INVALID, ABORTED)
since (Optional[str], optional): Get Items since this amount of time in the past. Defaults to None.
sort_ascending (Optional[bool], optional): Determines which direction the index of the results will be sorted. Defaults to False.
sort_index (Optional[str], optional): Determines which index to use for sorting, if not applying a filter (state_updated, updated). Defaults to None.
Expand Down Expand Up @@ -171,7 +171,7 @@ def get_state(self, payload_id: str) -> str:
payload_id (str): The Payload ID
Returns:
str: Current state: PROCESSING, COMPLETED, FAILED, INVALID
str: Current state: PROCESSING, COMPLETED, FAILED, INVALID, ABORTED
"""
response = self.table.get_item(Key=self.payload_id_to_key(payload_id))
if 'Item' in response:
Expand Down Expand Up @@ -368,6 +368,35 @@ def set_invalid(self, payload_id: str, msg: str) -> str:
logger.debug("set invalid", extra=key.update({'last_error': msg}))
return response

def set_aborted(self, payload_id: str) -> str:
"""Set this item as ABORTED
Args:
payload_id (str): The Cirrus Payload
Returns:
str: DynamoDB response
"""
now = datetime.now(timezone.utc).isoformat()
key = self.payload_id_to_key(payload_id)

expr = (
'SET '
'created = if_not_exists(created, :created), '
'state_updated=:state_updated, updated=:updated'
)
response = self.table.update_item(
Key=key,
UpdateExpression=expr,
ExpressionAttributeValues={
':created': now,
':state_updated': f"ABORTED_{now}",
':updated': now,
}
)
logger.debug("set aborted")
return response

def query(self, collections_workflow: str, state: str=None, since: str=None,
select: str='ALL_ATTRIBUTES', sort_ascending: bool=False, sort_index: str='updated', **kwargs) -> Dict:
"""Perform a single Query on a DynamoDB index
Expand Down
5 changes: 5 additions & 0 deletions tests/test_process_payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ def base_payload():
return read_json_fixture('test-payload.json')


@pytest.fixture()
def capella_payload():
return read_json_fixture('capella-fixture-2.json')


@pytest.fixture()
def sqs_event():
return read_json_fixture('sqs-event.json')
Expand Down
Loading

0 comments on commit 64f7cd1

Please sign in to comment.