From 41c0611e09d96d58bbb9a7117dbb08dcdef3418a Mon Sep 17 00:00:00 2001 From: devsjc <47188100+devsjc@users.noreply.github.com> Date: Fri, 8 Sep 2023 15:31:12 +0100 Subject: [PATCH] Add ecmwf daily scheduled job (#8) --- .github/workflows/ci.yml | 2 +- dags_tests/compile_test.py | 2 +- nwp/__init__.py | 3 +- nwp/assets/ecmwf/mars.py | 14 +++++---- nwp/jobs.py | 64 ++++++++++++++++++++++---------------- pyproject.toml | 10 +++--- 6 files changed, 53 insertions(+), 42 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5cccead..60f14b3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -38,7 +38,7 @@ jobs: # Should mirror the build-venv stage in the Containerfile - name: Build venv run: | - apt -qq update && apt -qq install -y build-essential + sudo apt -qq update && sudo apt -qq install -y build-essential libgeos-dev python -m venv ./venv ./venv/bin/pip install --upgrade -q pip wheel setuptools if: steps.restore-cache.outputs.cache-hit != 'true' diff --git a/dags_tests/compile_test.py b/dags_tests/compile_test.py index c979b12..e915e32 100644 --- a/dags_tests/compile_test.py +++ b/dags_tests/compile_test.py @@ -3,5 +3,5 @@ def test_compiles(): job_names = [d.name for d in list(defs.get_all_job_defs())] - assert "get_ecmwf_data" in job_names + assert "nwp_consumer_docker_job" in job_names assert len(job_names) == 18 diff --git a/nwp/__init__.py b/nwp/__init__.py index 1517e8e..5885e95 100644 --- a/nwp/__init__.py +++ b/nwp/__init__.py @@ -7,6 +7,5 @@ defs = Definitions( assets=all_assets, - jobs=[jobs.get_ecmwf_data], - schedules=jobs.schedule_jobs, + schedules=jobs.schedules, ) diff --git a/nwp/assets/ecmwf/mars.py b/nwp/assets/ecmwf/mars.py index dab74b4..4669d80 100644 --- a/nwp/assets/ecmwf/mars.py +++ b/nwp/assets/ecmwf/mars.py @@ -7,6 +7,7 @@ class NWPConsumerConfig(Config): date_from: str date_to: str source: str + docker_volumes: list[str] @op def nwp_consumer_docker_op(context: OpExecutionContext, config: NWPConsumerConfig): @@ -18,13 +19,14 @@ def nwp_consumer_docker_op(context: OpExecutionContext, config: NWPConsumerConfi f'--from={config.date_from}', f'--to={config.date_to}' ], - env_vars=["ECMWF_API_KEY", "ECMWF_API_URL", "ECMWF_API_EMAIL"], + env_vars=[ + "ECMWF_API_KEY", "ECMWF_API_URL", "ECMWF_API_EMAIL", + "CEDA_FTP_USER", "CEDA_FTP_PASS", + "METOFFICE_ORDER_ID", "METOFFICE_CLIENT_ID", "METOFFICE_CLIENT_SECRET", + "AWS_S3_BUCKET", "AWS_REGION", "AWS_ACCESS_KEY", "AWS_ACCESS_SECRET", + ], container_kwargs={ - "volumes": [ - '/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/raw:/tmp/raw', - '/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/zarr:/tmp/zarr', - '/tmp/nwpc:/tmp/nwpc' - ] + "volumes": docker_volumes } ) diff --git a/nwp/jobs.py b/nwp/jobs.py index 6ba61c2..9d0184a 100644 --- a/nwp/jobs.py +++ b/nwp/jobs.py @@ -1,27 +1,23 @@ -from dagster import AssetSelection, ScheduleDefinition, define_asset_job, schedule, job, RunConfig +from dagster import AssetSelection, ScheduleDefinition, define_asset_job, schedule, job, RunConfig, ScheduleEvaluationContext from nwp.assets.dwd.common import IconConfig from nwp.assets.ecmwf.mars import nwp_consumer_docker_op, NWPConsumerConfig import datetime as dt - -base_path = "/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/DWD" - +dwd_base_path = "/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/DWD" def build_config_on_runtime(model, run, delay=0): config = IconConfig(model=model, run=run, delay=delay, - folder=f"{base_path}/{'ICON_Global' if model == 'global' else 'ICON_EU'}/{run}", - zarr_path=f"{base_path}/{'ICON_Global' if model == 'global' else 'ICON_EU'}/{run}/{run}.zarr.zip") + folder=f"{dwd_base_path}/{'ICON_Global' if model == 'global' else 'ICON_EU'}/{run}", + zarr_path=f"{dwd_base_path}/{'ICON_Global' if model == 'global' else 'ICON_EU'}/{run}/{run}.zarr.zip") config_dict = {"delay": config.delay, "folder": config.folder, "model": config.model, "run": config.run, "zarr_path": config.zarr_path} return config_dict - -asset_jobs = [] -schedule_jobs = [] +schedules = [] for r in ["00", "06", "12", "18"]: for model in ["global", "eu"]: for delay in [0, 1]: @@ -36,31 +32,45 @@ def build_config_on_runtime(model, run, delay=0): ) match (delay, r): case (0, "00"): - schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="30 4 * * *")) + schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="30 4 * * *")) case (0, "06"): - schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="30 10 * * *")) + schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="30 10 * * *")) case (0, "12"): - schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="30 16 * * *")) + schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="30 16 * * *")) case (0, "18"): - schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="30 22 * * *")) + schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="30 22 * * *")) case (1, "00"): - schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="1 0 * * *")) + schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="1 0 * * *")) case (1, "06"): - schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="0 2 * * *")) + schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="0 2 * * *")) case (1, "12"): - schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="0 6 * * *")) + schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="0 6 * * *")) case (1, "18"): - schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="0 8 * * *")) + schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="0 8 * * *")) - asset_jobs.append(asset_job) - -@job(config=RunConfig( - ops={"nwp_consumer_docker_op": NWPConsumerConfig( - date_from="2021-01-01", - date_to="2021-01-01", - source="ecmwf-mars" - )} - )) -def get_ecmwf_data(): +@job +def nwp_consumer_docker_job(): nwp_consumer_docker_op() +@schedule(job=nwp_consumer_docker_job, cron_schedule="0 13 * * *") +def ecmwf_daily_local_archive_schedule(context: ScheduleEvaluationContext): + scheduled_date = context.scheduled_execution_time.strftime("%Y-%m-%d") + return RunRequest( + run_key=None, + run_config={ + "ops": {"nwp_consumer_docker_op": NWPConsumerConfig( + date_from=scheduled_date, + date_to=scheduled_date, + source="ecmwf-mars", + docker_volumes=[ + '/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/raw:/tmp/raw', + '/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/zarr:/tmp/zarr', + '/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/tmp:/tmp/nwpc' + ] + )} + }, + tags={"date": scheduled_date}, + ) + +schedules.append(ecmwf_daily_local_archive_schedule) + diff --git a/pyproject.toml b/pyproject.toml index 1432dc0..da82544 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,13 +24,13 @@ dependencies = [ "dagster-cloud == 1.4.11", "dagster-docker == 0.20.11", "huggingface-hub == 0.16.4", - "numpy == 1.24.2", + "numpy >= 1.23.0", "ocf-blosc2 == 0.0.3", "pathlib == 1.0.1", - "requests == 2.31.0", - "xarray == 2023.2.0", - "zarr == 2.14.2", - "satip == 2.11.10", + "requests >= 2.28.0", + "xarray >= 2022.3.0", + "zarr >= 2.13.3", + "satip >= 2.11.10", ] [project.optional-dependencies]