Skip to content

Commit

Permalink
Change nwp-consumer job to call python package (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
devsjc authored Sep 15, 2023
1 parent 0df3884 commit ef66117
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 16 deletions.
34 changes: 33 additions & 1 deletion nwp/assets/ecmwf/mars.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from dagster import Config, OpExecutionContext, op
import subprocess
from dagster import Config, OpExecutionContext, op, graph
from dagster_docker import execute_docker_container


Expand All @@ -7,6 +8,8 @@ class NWPConsumerConfig(Config):
date_to: str
source: str
docker_volumes: list[str]
raw_dir: str
zarr_dir: str
env_vars: list[str]

@op
Expand All @@ -27,3 +30,32 @@ def nwp_consumer_docker_op(context: OpExecutionContext, config: NWPConsumerConfi

pass

@op
def nwp_consumer_download_op(context: OpExecutionContext, config: NWPConsumerConfig):
process = subprocess.run(
["nwp-consumer", "download",
f'--source={config.source}', f'--from={config.date_from}', f'--to={config.date_to}',
f'--rdir={config.raw_dir}', f'--zdir={config.zarr_dir}'],
stdout = subprocess.PIPE,
stderr = subprocess.PIPE,
text=True
)
code = process.returncode
print(process.stdout)
print(process.stderr)
return config

@op
def nwp_consumer_convert_op(context: OpExecutionContext, downloadedConfig: NWPConsumerConfig):
process = subprocess.run(
["nwp-consumer", "convert",
f'--source={downloadedConfig.source}', f'--from={downloadedConfig.date_from}',
f'--to={downloadedConfig.date_to}',
f'--rdir={downloadedConfig.raw_dir}', f'--zdir={downloadedConfig.zarr_dir}'],
stdout = subprocess.PIPE,
stderr = subprocess.PIPE,
text=True
)
print(process.stdout)
print(process.stderr)

52 changes: 38 additions & 14 deletions nwp/jobs.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,28 @@
import datetime as dt
import json

from dagster import (
daily_partitioned_config,
AssetSelection,
DailyPartitionsDefinition,
ScheduleDefinition,
build_schedule_from_partitioned_job,
daily_partitioned_config,
define_asset_job,
job,
partitioned_config,
)

from nwp.assets.dwd.common import IconConfig
from nwp.assets.ecmwf.mars import nwp_consumer_docker_op
from nwp.assets.ecmwf.mars import (
NWPConsumerConfig,
nwp_consumer_convert_op,
nwp_consumer_download_op,
)

schedules = []

dwd_base_path = "/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/DWD"

def build_config_on_runtime(model, run, delay=0):
def build_config_on_runtime(model: str, run: str, delay: int = 0) -> dict:
"""Create a config dict for the DWD ICON model."""
config = IconConfig(model=model,
run=run,
delay=delay,
Expand Down Expand Up @@ -61,17 +65,37 @@ def build_config_on_runtime(model, run, delay=0):


@daily_partitioned_config(start_date=dt.datetime(2021, 1, 1))
def ecmwf_daily_partitioned_config(start: dt.datetime, _end: dt.datetime):
return {"ops": {"nwp_consumer_docker_op": {"config": {
"date_from": start.strftime("%Y-%m-%d"),
"date_to": start.strftime("%Y-%m-%d"),
"source": "ecmwf-mars",
"env_vars": ["ECMWF_API_URL", "ECMWF_API_KEY", "ECMWF_API_EMAIL"],
"docker_volumes": ['/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF:/tmp']
}}}}
def ecmwf_daily_partitioned_config_docker(start: dt.datetime, _end: dt.datetime):
config: NWPConsumerConfig = NWPConsumerConfig(
date_from=start.strftime("%Y-%m-%d"),
date_to=start.strftime("%Y-%m-%d"),
source="ecmwf-mars",
env_vars=["ECMWF_API_URL", "ECMWF_API_KEY", "ECMWF_API_EMAIL"],
docker_volumes=['/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF:/tmp'],
zarr_dir='/tmp/zarr',
raw_dir='/tmp/raw',
)
return {"ops": {
"nwp_consumer_docker_op": {"config": json.loads(config.json())},
}}


@daily_partitioned_config(start_date=dt.datetime(2021, 1, 1))
def ecmwf_daily_partitioned_config(start: dt.datetime, _end: dt.datetime):
config: NWPConsumerConfig = NWPConsumerConfig(
date_from=start.strftime("%Y-%m-%d"),
date_to=start.strftime("%Y-%m-%d"),
source="ecmwf-mars",
env_vars=["ECMWF_API_URL", "ECMWF_API_KEY", "ECMWF_API_EMAIL"],
zarr_dir='/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/zarr',
raw_dir='/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/raw',
)
return {"ops": {
"nwp_consumer_download_op": {"config": json.loads(config.json())},
}}
@job(config=ecmwf_daily_partitioned_config)
def ecmwf_daily_local_archive():
nwp_consumer_docker_op()
nwp_consumer_convert_op(nwp_consumer_download_op())

schedules.append(build_schedule_from_partitioned_job(ecmwf_daily_local_archive, hour_of_day=13))

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ dependencies = [
"dagster-docker == 0.20.11",
"huggingface-hub == 0.16.4",
"numpy >= 1.23.0",
"nwp-consumer >= 0.1.13",
"ocf-blosc2 == 0.0.3",
"pathlib == 1.0.1",
"requests >= 2.28.0",
"xarray >= 2022.3.0",
"zarr >= 2.13.3",
"satip >= 2.11.10",
"satip",
]

[project.optional-dependencies]
Expand Down

0 comments on commit ef66117

Please sign in to comment.