Skip to content
This repository has been archived by the owner on Nov 21, 2023. It is now read-only.

Upgrade to support pangeo-forge-recipes 0.4.0 release api changes. #15

Merged
merged 4 commits into from
Jul 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions pangeo_forge_prefect/flow_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from prefect import storage
from prefect.executors import DaskExecutor
from prefect.run_configs import ECSRun, KubernetesRun
from rechunker.executors import PrefectPipelineExecutor
from s3fs import S3FileSystem

from pangeo_forge_prefect.meta_types.bakery import (
Expand Down Expand Up @@ -278,9 +277,7 @@ def recipe_to_flow(
recipe.metadata_cache = targets.target

dask_executor = configure_dask_executor(bakery.cluster, meta.bakery, recipe_id, secrets)
executor = PrefectPipelineExecutor()
pipeline = recipe.to_pipelines()
flow = executor.pipelines_to_plan(pipeline)
flow = recipe.to_prefect()
flow.storage = configure_flow_storage(bakery.cluster, secrets)
run_config = configure_run_config(bakery.cluster, meta.bakery, recipe_id, secrets)
flow.run_config = run_config
Expand All @@ -290,8 +287,7 @@ def recipe_to_flow(
flow_task.run = set_log_level(flow_task.run)

flow.name = recipe_id
project_name = os.environ["PREFECT_PROJECT_NAME"]
flow.register(project_name=project_name)
return flow


def register_flow(meta_path: str, bakeries_path: str, secrets: Dict, versions: Versions):
Expand Down Expand Up @@ -319,16 +315,19 @@ def register_flow(meta_path: str, bakeries_path: str, secrets: Dict, versions: V
bakery = from_dict(data_class=Bakery, data=bakeries_dict[meta.bakery.id])

check_versions(meta, bakery.cluster, versions)
project_name = os.environ["PREFECT_PROJECT_NAME"]

for recipe_meta in meta.recipes:
if recipe_meta.dict_object:
recipes_dict = get_module_attribute(meta_path, recipe_meta.dict_object)
for key, value in recipes_dict.items():
extension = get_target_extension(value)
targets = configure_targets(bakery, meta.bakery, key, secrets, extension)
recipe_to_flow(bakery, meta, key, value, targets, secrets)
flow = recipe_to_flow(bakery, meta, key, value, targets, secrets)
flow.register(project_name=project_name)
else:
recipe = get_module_attribute(meta_path, recipe_meta.object)
extension = get_target_extension(recipe)
targets = configure_targets(bakery, meta.bakery, recipe_meta.id, secrets, extension)
recipe_to_flow(bakery, meta, recipe_meta.id, recipe, targets, secrets)
flow = recipe_to_flow(bakery, meta, recipe_meta.id, recipe, targets, secrets)
flow.register(project_name=project_name)
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
name="pangeo_forge_prefect",
packages=find_packages(),
install_requires=[
"pangeo-forge-recipes>=0.3.3",
"pangeo-forge-recipes>=0.4.0",
"pyyaml==5.4.1",
"prefect[aws]>=0.14.13",
"dacite==1.6.0",
Expand Down
30 changes: 15 additions & 15 deletions test/data/bakeries.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
devseed.bakery.development.aws.us-west-2:
region: aws.us-west-2
targets:
pangeo-forge-aws-bakery-flowcachebucketpangeofor-196cpck7y0pbl:
pangeo-forge-aws-bakery-flowcachebucketdasktest4-10neo67y7a924:
region: aws.us-west-2
description: "Internal bucket"
private:
Expand All @@ -12,18 +12,18 @@ devseed.bakery.development.aws.us-west-2:
secret: DEVSEED_BAKERY_DEVELOPMENT_AWS_US_WEST_2_SECRET
cluster:
type: aws.fargate
pangeo_forge_version: "0.3.3"
pangeo_notebook_version: "2021.05.04"
prefect_version: "0.14.7"
worker_image: 552819999234.dkr.ecr.us-west-2.amazonaws.com/pangeo-forge-aws-bakery-worker
pangeo_forge_version: "0.4.0"
pangeo_notebook_version: "2021.06.05"
prefect_version: "0.14.22"
worker_image: pangeo/pangeo-forge-bakery-images:pangeonotebook-2021.06.05_prefect-0.14.22_pangeoforgerecipes-0.4.0
cluster_options:
vpc: vpc-0e519fd83fa521d72
cluster_arn: arn:aws:ecs:us-west-2:552819999234:cluster/pangeo-forge-aws-bakery-pangeo-forge-dask-bakeryclusterpangeoforgedask71B831F8-BTL3Vmp8cuso
task_role_arn: arn:aws:iam::552819999234:role/pangeo-forge-aws-bakery-p-prefectecstaskrolepangeo-3R73K3Z1XU70
execution_role_arn: arn:aws:iam::552819999234:role/pangeo-forge-aws-bakery-p-prefectecstaskexecutionr-5ZGMH0A8LVXF
vpc: vpc-01160815e8310bbe0
cluster_arn: arn:aws:ecs:us-west-2:552819999234:cluster/pangeo-forge-aws-bakery-dask-test-bakeryclusterdasktest4E1A8264-k6Cnpc8EuUpH
task_role_arn: arn:aws:iam::552819999234:role/pangeo-forge-aws-bakery-d-prefectecstaskroledaskte-XUA2HGW10IJV
execution_role_arn: arn:aws:iam::552819999234:role/pangeo-forge-aws-bakery-d-prefectecstaskexecutionr-KLL5UEHNBF8Z
security_groups:
- sg-0c4d6e997637c801d
flow_storage: pangeo-forge-aws-bakery-flowstoragebucketpangeof-71w6gsnambj9
- sg-0ca6b9d46294e5623
flow_storage: pangeo-forge-aws-bakery-flowstoragebucketdasktes-1bchocmkc3w0
flow_storage_protocol: s3
flow_storage_options:
key: DEVSEED_BAKERY_DEVELOPMENT_AWS_US_WEST_2_KEY
Expand All @@ -41,10 +41,10 @@ devseed.bakery.development.azure.ukwest:
secret: DEVSEED_BAKERY_DEVELOPMENT_AZURE_UKWEST_CONNECTION_STRING
cluster:
type: azure.aks
pangeo_forge_version: "0.3.4"
pangeo_notebook_version: "2021.05.15"
prefect_version: "0.14.19"
worker_image: pangeo/pangeo-forge-bakery-images:pangeonotebook-2021.05.15_prefect-0.14.19_pangeoforgerecipes-0.3.4
pangeo_forge_version: "0.4.0"
pangeo_notebook_version: "2021.06.05"
prefect_version: "0.14.22"
worker_image: pangeo/pangeo-forge-bakery-images:pangeonotebook-2021.06.05_prefect-0.14.22_pangeoforgerecipes-0.4.0
flow_storage: test-bakery-flow-storage-container
flow_storage_protocol: abfs
flow_storage_options:
Expand Down
6 changes: 3 additions & 3 deletions test/data/meta_aws.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
title: "NOAA Optimum Interpolated SST"
description: "Analysis-ready Zarr datasets derived from NOAA OISST NetCDF"
pangeo_forge_version: "0.3.3"
pangeo_notebook_version: "2021.05.04"
pangeo_forge_version: "0.4.0"
pangeo_notebook_version: "2021.06.05"
recipes:
- id: noaa-oisst-avhrr-only
object: "recipe:recipe"
Expand All @@ -20,7 +20,7 @@ maintainers:
github: rabernat
bakery:
id: "devseed.bakery.development.aws.us-west-2" # must come from a valid list of bakeries
target: pangeo-forge-aws-bakery-flowcachebucketpangeofor-196cpck7y0pbl
target: pangeo-forge-aws-bakery-flowcachebucketdasktest4-10neo67y7a924
resources:
memory: 4096
cpu: 1024
26 changes: 14 additions & 12 deletions test/data/recipe.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import pandas as pd
from pangeo_forge_recipes.patterns import pattern_from_file_sequence
from pangeo_forge_recipes.patterns import ConcatDim, FilePattern
from pangeo_forge_recipes.recipes import XarrayZarrRecipe

input_url_pattern = (
"https://www.ncei.noaa.gov/data/sea-surface-temperature-optimum-interpolation"
"/v2.1/access/avhrr/{yyyymm}/oisst-avhrr-v02r01.{yyyymmdd}.nc"
)
dates = pd.date_range("2019-09-01", "2021-01-05", freq="D")
input_urls = [
input_url_pattern.format(yyyymm=day.strftime("%Y%m"), yyyymmdd=day.strftime("%Y%m%d"))
for day in dates
]
pattern = pattern_from_file_sequence(input_urls, "time", nitems_per_file=1)

recipe = XarrayZarrRecipe(pattern, inputs_per_chunk=20)
def format_function(time):
base = pd.Timestamp("2019-09-01")
day = base + pd.Timedelta(days=time)
input_url_pattern = (
"https://www.ncei.noaa.gov/data/sea-surface-temperature-optimum-interpolation"
"/v2.1/access/avhrr/{day:%Y%m}/oisst-avhrr-v02r01.{day:%Y%m%d}.nc"
)
return input_url_pattern.format(day=day)


dates = pd.date_range("2019-09-01", "2021-01-05", freq="D")
pattern = FilePattern(format_function, ConcatDim("time", range(len(dates)), 1))
recipe = XarrayZarrRecipe(pattern, inputs_per_chunk=20, cache_inputs=True)
98 changes: 55 additions & 43 deletions test/test_flow_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,19 @@
import pathlib
from unittest.mock import patch

import fsspec
import pytest
import yaml
from dacite import from_dict
from dask_cloudprovider.aws.ecs import FargateCluster
from dask_kubernetes import KubeCluster
from pangeo_forge_recipes.recipes import XarrayZarrRecipe
from pangeo_forge_recipes.storage import CacheFSSpecTarget, FSSpecTarget
from prefect.core import Flow
from prefect.run_configs import ECSRun, KubernetesRun

from pangeo_forge_prefect.flow_manager import (
Targets,
UnsupportedClusterType,
UnsupportedFlowStorage,
UnsupportedPangeoVersion,
Expand All @@ -23,6 +27,7 @@
configure_targets,
get_module_attribute,
get_target_extension,
recipe_to_flow,
)
from pangeo_forge_prefect.meta_types.bakery import Bakery
from pangeo_forge_prefect.meta_types.meta import Meta
Expand Down Expand Up @@ -88,17 +93,40 @@ def k8s_job_template():
return job_template


@patch("pangeo_forge_prefect.flow_manager.S3FileSystem")
def test_configure_targets_aws(S3FileSystem, aws_bakery, meta_aws):
recipe_name = "test"
key = "key"
secret = "secret"
extension = "zarr"
secrets = {
recipe_name = "test"
key = "key"
secret = "secret"
extension = "zarr"


@pytest.fixture
def secrets():
secret_values = {
"DEVSEED_BAKERY_DEVELOPMENT_AWS_US_WEST_2_KEY": key,
"DEVSEED_BAKERY_DEVELOPMENT_AWS_US_WEST_2_SECRET": secret,
"DEVSEED_BAKERY_DEVELOPMENT_AZURE_UKWEST_CONNECTION_STRING": secret,
"GITHUB_REPOSITORY": "staged-recipes",
}
return secret_values


@pytest.fixture()
def tmp_target(tmpdir_factory):
fs = fsspec.get_filesystem_class("file")()
path = str(tmpdir_factory.mktemp("target"))
return FSSpecTarget(fs, path)


@pytest.fixture()
def tmp_cache(tmpdir_factory):
path = str(tmpdir_factory.mktemp("cache"))
fs = fsspec.get_filesystem_class("file")()
cache = CacheFSSpecTarget(fs, path)
return cache


@patch("pangeo_forge_prefect.flow_manager.S3FileSystem")
def test_configure_targets_aws(S3FileSystem, aws_bakery, meta_aws, secrets):
targets = configure_targets(aws_bakery, meta_aws.bakery, recipe_name, secrets, extension)
S3FileSystem.assert_called_once_with(
anon=False,
Expand All @@ -111,21 +139,14 @@ def test_configure_targets_aws(S3FileSystem, aws_bakery, meta_aws):
f"s3://{meta_aws.bakery.target}/pangeo-forge/staged-recipes/{recipe_name}.zarr"
)
aws_bakery.targets[
"pangeo-forge-aws-bakery-flowcachebucketpangeofor-196cpck7y0pbl"
"pangeo-forge-aws-bakery-flowcachebucketdasktest4-10neo67y7a924"
].private.protocol = "GCS"
with pytest.raises(UnsupportedTarget):
configure_targets(aws_bakery, meta_aws.bakery, recipe_name, secrets, extension)


@patch("pangeo_forge_prefect.flow_manager.AzureBlobFileSystem")
def test_configure_targets_azure(AzureBlobFileSystem, azure_bakery, meta_azure):
recipe_name = "test"
secret = "secret"
extension = "zarr"
secrets = {
"DEVSEED_BAKERY_DEVELOPMENT_AZURE_UKWEST_CONNECTION_STRING": secret,
"GITHUB_REPOSITORY": "staged-recipes",
}
def test_configure_targets_azure(AzureBlobFileSystem, azure_bakery, meta_azure, secrets):
targets = configure_targets(azure_bakery, meta_azure.bakery, recipe_name, secrets, extension)
AzureBlobFileSystem.assert_called_once_with(
connection_string=secret,
Expand All @@ -139,13 +160,7 @@ def test_configure_targets_azure(AzureBlobFileSystem, azure_bakery, meta_azure):


@patch("pangeo_forge_prefect.flow_manager.storage")
def test_configure_flow_storage_aws(storage, aws_bakery):
key = "key"
secret = "secret"
secrets = {
"DEVSEED_BAKERY_DEVELOPMENT_AWS_US_WEST_2_KEY": key,
"DEVSEED_BAKERY_DEVELOPMENT_AWS_US_WEST_2_SECRET": secret,
}
def test_configure_flow_storage_aws(storage, aws_bakery, secrets):
configure_flow_storage(aws_bakery.cluster, secrets)
storage.S3.assert_called_once_with(
bucket=aws_bakery.cluster.flow_storage,
Expand All @@ -157,11 +172,7 @@ def test_configure_flow_storage_aws(storage, aws_bakery):


@patch("pangeo_forge_prefect.flow_manager.storage")
def test_configure_flow_storage_azure(storage, azure_bakery):
secret = "A_CONNECTION_STRING"
secrets = {
"DEVSEED_BAKERY_DEVELOPMENT_AZURE_UKWEST_CONNECTION_STRING": secret,
}
def test_configure_flow_storage_azure(storage, azure_bakery, secrets):
configure_flow_storage(azure_bakery.cluster, secrets)
storage.Azure.assert_called_once_with(
container=azure_bakery.cluster.flow_storage,
Expand Down Expand Up @@ -193,12 +204,7 @@ def test_configure_dask_executor_aws(aws_bakery, meta_aws):


@patch("pangeo_forge_prefect.flow_manager.make_pod_spec")
def test_configure_dask_executor_azure(make_pod_spec, azure_bakery, meta_azure):
recipe_name = "test"
secret = "A_CONNECTION_STRING"
secrets = {
"DEVSEED_BAKERY_DEVELOPMENT_AZURE_UKWEST_CONNECTION_STRING": secret,
}
def test_configure_dask_executor_azure(make_pod_spec, azure_bakery, meta_azure, secrets):
dask_executor = configure_dask_executor(
azure_bakery.cluster, meta_azure.bakery, recipe_name, secrets
)
Expand Down Expand Up @@ -242,12 +248,7 @@ def test_configure_run_config_aws(aws_bakery, meta_aws):
configure_run_config(aws_bakery.cluster, meta_aws.bakery, recipe_name, {})


def test_configure_run_config_azure(azure_bakery, meta_azure, k8s_job_template):
recipe_name = "test"
secret = "A_CONNECTION_STRING"
secrets = {
"DEVSEED_BAKERY_DEVELOPMENT_AZURE_UKWEST_CONNECTION_STRING": secret,
}
def test_configure_run_config_azure(azure_bakery, meta_azure, k8s_job_template, secrets):
run_config = configure_run_config(azure_bakery.cluster, meta_azure.bakery, recipe_name, secrets)
assert type(run_config) == KubernetesRun
assert meta_azure.bakery.id in run_config.labels
Expand All @@ -263,9 +264,9 @@ def test_configure_run_config_azure(azure_bakery, meta_azure, k8s_job_template):

def test_check_versions(aws_bakery, meta_aws):
versions = Versions(
pangeo_notebook_version="2021.05.04",
pangeo_forge_version="0.3.3",
prefect_version="0.14.7",
pangeo_notebook_version="2021.06.05",
pangeo_forge_version="0.4.0",
prefect_version="0.14.22",
)
assert check_versions(meta_aws, aws_bakery.cluster, versions)
versions.pangeo_notebook_version = "none"
Expand All @@ -289,3 +290,14 @@ def test_get_target_extension():

with pytest.raises(UnsupportedRecipeType):
get_target_extension({})


@patch.dict(os.environ, {"PREFECT_PROJECT_NAME": "project"})
def test_recipe_to_flow(aws_bakery, meta_aws, secrets, tmp_target, tmp_cache):
meta_path = pathlib.Path(__file__).parent.absolute().joinpath("./data/meta.yaml")
recipe = get_module_attribute(meta_path, meta_aws.recipes[-1].object)

targets = Targets(target=tmp_target, cache=tmp_cache)

flow = recipe_to_flow(aws_bakery, meta_aws, "recipe_id", recipe, targets, secrets)
assert isinstance(flow, Flow)