Skip to content
This repository has been archived by the owner on Dec 5, 2023. It is now read-only.

Task class #43

Closed
wants to merge 29 commits into from
Closed
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
0c01047
initial draft of a Task class
matthewhanson Feb 14, 2022
cd5c1ed
Merge branch 'main' into features/task-class
matthewhanson Mar 22, 2022
a4ad49d
updated Task class and add example task
matthewhanson Mar 24, 2022
5306265
change workdir/save logic
matthewhanson Mar 29, 2022
e2c826c
default save value of None in task cli
matthewhanson Apr 3, 2022
4bb451f
use loglevel enum
matthewhanson Apr 9, 2022
b931867
split out cli parser from parsing
matthewhanson Apr 9, 2022
fe51ccd
split out cli parser from parsing
matthewhanson Apr 9, 2022
fabc6b8
use pathlib for paths
matthewhanson Apr 9, 2022
885474b
NothingTask for testing Task class
matthewhanson Apr 9, 2022
ffd4d02
return new items from process function
matthewhanson Apr 11, 2022
32dfd4d
add properties to Task class
matthewhanson Apr 12, 2022
59a331c
cleanup
matthewhanson Apr 12, 2022
da6f941
add initial task tests
matthewhanson Apr 12, 2022
aaf0ec7
update CHANGELOG with task class
matthewhanson Apr 12, 2022
5cec1a8
use VCR for testing download assets
matthewhanson Apr 12, 2022
985477e
more download asset test
matthewhanson Apr 12, 2022
8ccce48
update task output payload
matthewhanson Apr 12, 2022
e213454
Update src/cirrus/lib/task.py
matthewhanson Apr 23, 2022
b61fdca
update dev requirements
matthewhanson May 13, 2022
599e3a2
Merge branch 'features/task-class' of github.com:cirrus-geo/cirrus-li…
matthewhanson May 13, 2022
0328c8e
make validate a classmethod
matthewhanson May 14, 2022
10fb40f
update CHANGELOG
matthewhanson May 27, 2022
3f21e67
cleanup task
matthewhanson May 27, 2022
3eaf0b9
make create_item_from_item staticmethod
matthewhanson May 27, 2022
6812428
update access to payload
matthewhanson May 27, 2022
9da9121
update tests
matthewhanson May 27, 2022
518d829
download and upoad assets take in item
matthewhanson May 27, 2022
f6a6dd0
return item from upload when in local mode
matthewhanson May 28, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

## [Unreleased]

### Added

* Abstract base class `Task` for creating new tasks ([#42])
matthewhanson marked this conversation as resolved.
Show resolved Hide resolved


## [v0.7.0] - 2022-02-17

Expand Down
11 changes: 6 additions & 5 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mock==4.0.2
pytest~=6.2
pytest-cov~=2.9
moto~=1.3.16
flake8~=3.8
mock~=4.0.2
pytest~=7.1
pytest-cov~=3.0
moto~=3.1
flake8~=4.0
vcrpy~=4.1
235 changes: 235 additions & 0 deletions src/cirrus/lib/task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
from abc import ABC, abstractmethod #abstractclassmethod
import argparse
from collections import OrderedDict
from concurrent.futures import process
matthewhanson marked this conversation as resolved.
Show resolved Hide resolved
from copy import deepcopy
import json
import logging
from operator import getitem
matthewhanson marked this conversation as resolved.
Show resolved Hide resolved
from os import makedirs
from pathlib import Path
from shutil import rmtree
import sys
from tempfile import mkdtemp
from typing import Dict, List, Optional, Union

from boto3utils import s3

from cirrus.lib.logging import get_task_logger
from cirrus.lib.process_payload import ProcessPayload
from cirrus.lib.transfer import download_item_assets, upload_item_assets

# types
PathLike = Union[str, Path]


class Task(ABC):

_name = 'task'
_description = 'A task for doing things'
_version = '0.1.0'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems weird to me to default these things. Shouldn't they be required parameters on any subclasses? That said, validating that they are set on subclasses likely would require using the __new__ method on a custom metaclass. Perhaps we make an issue to follow up on this instead of trying to resolve it now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are static attributes and this was the easiest way to define them, but yes they should be required. Every base class should define them, so open to options on how to better do that.


def __init__(self: "Task", payload: Dict,
local: Optional[bool]=False,
workdir: Optional[PathLike]=None,
skip_validation: Optional[bool] = False):
# parse event
payload = ProcessPayload.from_event(payload)

if not skip_validation:
self.validate(payload)

self._payload = payload

# The original items from the payload
self.original_items = deepcopy(payload['features'])

self.items = self._payload['features']

# set up logger
self.logger = get_task_logger(f"task.{self._name}", payload=payload)

# local mode?
self._local = local

# create temporary work directory if workdir is None
self._workdir = workdir
if workdir is None:
self._workdir = Path(mkdtemp())
self._tmpworkdir = True
else:
self._workdir = Path(workdir)
self._tmpworkdir = False
makedirs(self._workdir, exist_ok=True)

def __del__(self):
# remove work directory if not running locally
if self._tmpworkdir:
self.logger.debug(f"Removing work directory {self._workdir}")
rmtree(self._workdir)

@property
def id(self) -> str:
return self._payload['id']

@property
def output_payload(self):
processing_ext = 'https://stac-extensions.github.io/processing/v1.1.0/schema.json'
for i in self.items:
i['stac_extensions'].append(processing_ext)
i['stac_extensions'] = list(set(i['stac_extensions']))
i['properties']['processing:software'] = {
self._name: self._version
}
self._payload['features'] = self.items
# add provenance metadata

return self._payload

@property
def process_definition(self) -> Dict:
return self._payload['process']
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have an incompatibility here with workflow chaining. You should use self._payload.process instead.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be worth adding a workflow chaining test case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.
Test for workflow chaining test case sounds like a good idea. Not it.


@property
def parameters(self) -> Dict:
return self.process_definition['tasks'].get(self._name, {})

@property
def output_options(self) -> Dict:
return self.process_definition.get('output_options', {})

@classmethod
def validate(cls, payload) -> bool:
# put validation logic on input Items and process definition here
return True

def download_assets(self, assets: Optional[List[str]]=None):
"""Download provided asset keys for all items in payload. Assets are saved in workdir in a
directory named by the Item ID, and the items are updated with the new asset hrefs.

Args:
assets (Optional[List[str]], optional): List of asset keys to download. Defaults to all assets.
"""
for i, item in enumerate(self.items):
outdir = self._workdir / Path(item['id'])
makedirs(outdir, exist_ok=True)
self.items[i] = download_item_assets(item, path=outdir, assets=assets)

def upload_assets(self, assets: Optional[List[str]]=None):
if self._local:
self.logger.warn('Running in local mode, assets not uploaded')
return
for i, item in enumerate(self.items):
self.items[i] = upload_item_assets(item, **self.output_options)

# this should be in PySTAC
@classmethod
matthewhanson marked this conversation as resolved.
Show resolved Hide resolved
def create_item_from_item(self, item):
# create a derived output item
links = [l['href'] for l in item['links'] if l['rel'] == 'self']
if len(links) == 1:
# add derived from link
item ['links'].append({
'title': 'Source STAC Item',
'rel': 'derived_from',
'href': links[0],
'type': 'application/json'
})
return item

@abstractmethod
def process(self, **kwargs) -> List[Dict]:
"""Main task logic - virtual

Returns:
[type]: [description]
"""
# download assets of interest, this will update self.items
#self.download_assets(['key1', 'key2'])
# do some stuff
#self.upload_assets(['key1', 'key2'])
return self.items

@classmethod
def handler(cls, payload, **kwargs):
task = cls(payload, **kwargs)
try:
task.items = task.process(**task.parameters)
return task.output_payload
except Exception as err:
task.logger.error(err, exc_info=True)
raise err

@classmethod
def get_cli_parser(cls):
""" Parse CLI arguments """
dhf = argparse.ArgumentDefaultsHelpFormatter
parser0 = argparse.ArgumentParser(description=cls._description)
parser0.add_argument('--version', help='Print version and exit', action='version', version=cls._version)

pparser = argparse.ArgumentParser(add_help=False)
pparser.add_argument('--logging', default='INFO', help='DEBUG, INFO, WARN, ERROR, CRITICAL')
subparsers = parser0.add_subparsers(dest='command')

# process subcommand
h = 'Locally process (development)'
parser = subparsers.add_parser('local', parents=[pparser], help=h, formatter_class=dhf)
parser.add_argument('filename', help='Full path of payload to process')
h = 'Use this as work directory. Will be created but not deleted)'
parser.add_argument('--workdir', help=h, default=None, type=Path)
parser.add_argument('--save', help='Save output with provided filename', default=None)

# Cirrus process subcommand
h = 'Process Cirrus STAC Process Catalog'
parser = subparsers.add_parser('cirrus', parents=[pparser], help=h, formatter_class=dhf)
parser.add_argument('url', help='url (s3 or local) to Cirrus Process Payload')
return parser0

@classmethod
def parse_args(cls, args, parser=None):
if parser is None:
parser = cls.get_cli_parser()
# turn Namespace into dictionary
pargs = vars(parser.parse_args(args))
# only keep keys that are not None
pargs = {k: v for k, v in pargs.items() if v is not None}

if pargs.get('command', None) is None:
parser.print_help()
matthewhanson marked this conversation as resolved.
Show resolved Hide resolved
sys.exit(0)

return pargs

@classmethod
def cli(cls, parser=None):
args = cls.parse_args(sys.argv[1:], parser=parser)
cmd = args.pop('command')

# logging
loglevel = args.pop('logging')
logging.basicConfig(stream=sys.stdout,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=loglevel)
# quiet these loud loggers
quiet_loggers = ['botocore', 's3transfer', 'urllib3']
for ql in quiet_loggers:
logging.getLogger(ql).propagate = False

if cmd == 'local':
save = args.pop('save', None)
# open local payload
with open(args.pop('filename')) as f:
payload = json.loads(f.read())
# run task handler
output = cls.handler(payload, local=True, **args)
# save task output
if save:
with open(save, 'w') as f:
f.write(json.dumps(output))
if cmd == 'cirrus':
# get remote payload
payload = s3().read_json(args['url'])
# run task handler
output = cls.handler(payload)
# upload task output
s3().upload_json(output, args["url"].replace('.json', '_out.json'))
Loading