Skip to content

Commit

Permalink
Add ecmwf daily scheduled job (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
devsjc authored Sep 8, 2023
1 parent 4d044dc commit 41c0611
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 42 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:
# Should mirror the build-venv stage in the Containerfile
- name: Build venv
run: |
apt -qq update && apt -qq install -y build-essential
sudo apt -qq update && sudo apt -qq install -y build-essential libgeos-dev
python -m venv ./venv
./venv/bin/pip install --upgrade -q pip wheel setuptools
if: steps.restore-cache.outputs.cache-hit != 'true'
Expand Down
2 changes: 1 addition & 1 deletion dags_tests/compile_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@

def test_compiles():
job_names = [d.name for d in list(defs.get_all_job_defs())]
assert "get_ecmwf_data" in job_names
assert "nwp_consumer_docker_job" in job_names
assert len(job_names) == 18
3 changes: 1 addition & 2 deletions nwp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,5 @@

defs = Definitions(
assets=all_assets,
jobs=[jobs.get_ecmwf_data],
schedules=jobs.schedule_jobs,
schedules=jobs.schedules,
)
14 changes: 8 additions & 6 deletions nwp/assets/ecmwf/mars.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ class NWPConsumerConfig(Config):
date_from: str
date_to: str
source: str
docker_volumes: list[str]

@op
def nwp_consumer_docker_op(context: OpExecutionContext, config: NWPConsumerConfig):
Expand All @@ -18,13 +19,14 @@ def nwp_consumer_docker_op(context: OpExecutionContext, config: NWPConsumerConfi
f'--from={config.date_from}',
f'--to={config.date_to}'
],
env_vars=["ECMWF_API_KEY", "ECMWF_API_URL", "ECMWF_API_EMAIL"],
env_vars=[
"ECMWF_API_KEY", "ECMWF_API_URL", "ECMWF_API_EMAIL",
"CEDA_FTP_USER", "CEDA_FTP_PASS",
"METOFFICE_ORDER_ID", "METOFFICE_CLIENT_ID", "METOFFICE_CLIENT_SECRET",
"AWS_S3_BUCKET", "AWS_REGION", "AWS_ACCESS_KEY", "AWS_ACCESS_SECRET",
],
container_kwargs={
"volumes": [
'/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/raw:/tmp/raw',
'/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/zarr:/tmp/zarr',
'/tmp/nwpc:/tmp/nwpc'
]
"volumes": docker_volumes
}
)

Expand Down
64 changes: 37 additions & 27 deletions nwp/jobs.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,23 @@
from dagster import AssetSelection, ScheduleDefinition, define_asset_job, schedule, job, RunConfig
from dagster import AssetSelection, ScheduleDefinition, define_asset_job, schedule, job, RunConfig, ScheduleEvaluationContext

from nwp.assets.dwd.common import IconConfig
from nwp.assets.ecmwf.mars import nwp_consumer_docker_op, NWPConsumerConfig

import datetime as dt


base_path = "/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/DWD"

dwd_base_path = "/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/DWD"

def build_config_on_runtime(model, run, delay=0):
config = IconConfig(model=model,
run=run,
delay=delay,
folder=f"{base_path}/{'ICON_Global' if model == 'global' else 'ICON_EU'}/{run}",
zarr_path=f"{base_path}/{'ICON_Global' if model == 'global' else 'ICON_EU'}/{run}/{run}.zarr.zip")
folder=f"{dwd_base_path}/{'ICON_Global' if model == 'global' else 'ICON_EU'}/{run}",
zarr_path=f"{dwd_base_path}/{'ICON_Global' if model == 'global' else 'ICON_EU'}/{run}/{run}.zarr.zip")
config_dict = {"delay": config.delay, "folder": config.folder, "model": config.model, "run": config.run,
"zarr_path": config.zarr_path}
return config_dict


asset_jobs = []
schedule_jobs = []
schedules = []
for r in ["00", "06", "12", "18"]:
for model in ["global", "eu"]:
for delay in [0, 1]:
Expand All @@ -36,31 +32,45 @@ def build_config_on_runtime(model, run, delay=0):
)
match (delay, r):
case (0, "00"):
schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="30 4 * * *"))
schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="30 4 * * *"))
case (0, "06"):
schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="30 10 * * *"))
schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="30 10 * * *"))
case (0, "12"):
schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="30 16 * * *"))
schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="30 16 * * *"))
case (0, "18"):
schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="30 22 * * *"))
schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="30 22 * * *"))
case (1, "00"):
schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="1 0 * * *"))
schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="1 0 * * *"))
case (1, "06"):
schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="0 2 * * *"))
schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="0 2 * * *"))
case (1, "12"):
schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="0 6 * * *"))
schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="0 6 * * *"))
case (1, "18"):
schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="0 8 * * *"))
schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="0 8 * * *"))

asset_jobs.append(asset_job)

@job(config=RunConfig(
ops={"nwp_consumer_docker_op": NWPConsumerConfig(
date_from="2021-01-01",
date_to="2021-01-01",
source="ecmwf-mars"
)}
))
def get_ecmwf_data():
@job
def nwp_consumer_docker_job():
nwp_consumer_docker_op()

@schedule(job=nwp_consumer_docker_job, cron_schedule="0 13 * * *")
def ecmwf_daily_local_archive_schedule(context: ScheduleEvaluationContext):
scheduled_date = context.scheduled_execution_time.strftime("%Y-%m-%d")
return RunRequest(
run_key=None,
run_config={
"ops": {"nwp_consumer_docker_op": NWPConsumerConfig(
date_from=scheduled_date,
date_to=scheduled_date,
source="ecmwf-mars",
docker_volumes=[
'/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/raw:/tmp/raw',
'/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/zarr:/tmp/zarr',
'/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/tmp:/tmp/nwpc'
]
)}
},
tags={"date": scheduled_date},
)

schedules.append(ecmwf_daily_local_archive_schedule)

10 changes: 5 additions & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ dependencies = [
"dagster-cloud == 1.4.11",
"dagster-docker == 0.20.11",
"huggingface-hub == 0.16.4",
"numpy == 1.24.2",
"numpy >= 1.23.0",
"ocf-blosc2 == 0.0.3",
"pathlib == 1.0.1",
"requests == 2.31.0",
"xarray == 2023.2.0",
"zarr == 2.14.2",
"satip == 2.11.10",
"requests >= 2.28.0",
"xarray >= 2022.3.0",
"zarr >= 2.13.3",
"satip >= 2.11.10",
]

[project.optional-dependencies]
Expand Down

0 comments on commit 41c0611

Please sign in to comment.