diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..b9afde2 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,90 @@ +name: Python CI + +on: + push: + branches: [] + paths-ignore: + - 'README.md' + pull_request: + branches: [] + paths-ignore: + - 'README.md' + +# Specify concurrency such that only one workflow can run at a time +# * Different workflow files are not affected +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + # Define a dependencies job that runs on all branches and PRs + # * Installs dependencies and caches them + build-venv: + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v3 + + # Restore cached virtualenv, if available + # * The pyproject.toml hash is part of the cache key, invalidating + # the cache if the file changes + - name: Restore cached virtualenv + id: restore-cache + uses: actions/cache/restore@v3 + with: + path: ./venv + key: ${{ runner.os }}-venv-${{ hashFiles('**/pyproject.toml') }} + + # Should mirror the build-venv stage in the Containerfile + - name: Build venv + run: | + apt -qq update && apt -qq install -y build-essential + python -m venv ./venv + ./venv/bin/pip install --upgrade -q pip wheel setuptools + if: steps.restore-cache.outputs.cache-hit != 'true' + + # Should mirror the build-reqs stage in the Containerfile + # * Except this installs the dev dependencies and binaries as well + - name: Install all dependencies + run: | + ./venv/bin/pip install -q .[dev] + if: steps.restore-cache.outputs.cache-hit != 'true' + + # Cache the virtualenv for future runs + - name: Cache virtualenv + uses: actions/cache/save@v3 + with: + path: ./venv + key: ${{ steps.restore-cache.outputs.cache-primary-key }} + if: steps.restore-cache.outputs.cache-hit != 'true' + + # Define a unittest job that runs on all branches and PRs + test-unit: + runs-on: ubuntu-latest + needs: build-venv + + steps: + - name: Checkout repository + uses: actions/checkout@v3 + + # Restore cached virtualenv + - name: Restore cached virtualenv + uses: actions/cache/restore@v3 + with: + path: ./venv + key: ${{ runner.os }}-venv-${{ hashFiles('**/pyproject.toml') }} + + # Run unittests + # * Produce JUnit XML report + - name: Run unit tests + run: ./venv/bin/python -m pytest --junitxml=ut-report.xml dags_tests + + # Create test summary to be visualised on the job summary screen on GitHub + # * Runs even if previous steps fail + - name: Create test summary + uses: test-summary/action@v2 + with: + paths: "*t-report.xml" + show: "fail, skip" + if: always() + diff --git a/dags_tests/compile_test.py b/dags_tests/compile_test.py new file mode 100644 index 0000000..58085da --- /dev/null +++ b/dags_tests/compile_test.py @@ -0,0 +1,10 @@ +from dagster import Definitions, load_assets_from_modules +from nwp import assets, jobs + +def test_compiles(): + all_assets = load_assets_from_modules([assets]) + defs = Definitions( + assets=all_assets, + jobs=jobs.asset_jobs, + schedules=jobs.schedule_jobs + ) diff --git a/nwp/__pycache__/__init__.cpython-310.pyc b/nwp/__pycache__/__init__.cpython-310.pyc index 2716a13..8dc65d8 100644 Binary files a/nwp/__pycache__/__init__.cpython-310.pyc and b/nwp/__pycache__/__init__.cpython-310.pyc differ diff --git a/nwp/__pycache__/jobs.cpython-310.pyc b/nwp/__pycache__/jobs.cpython-310.pyc index 1d73e00..721cd79 100644 Binary files a/nwp/__pycache__/jobs.cpython-310.pyc and b/nwp/__pycache__/jobs.cpython-310.pyc differ diff --git a/nwp/assets/__init__.py b/nwp/assets/__init__.py index 3588b17..b413960 100644 --- a/nwp/assets/__init__.py +++ b/nwp/assets/__init__.py @@ -1 +1,2 @@ from nwp.assets.dwd.archive_to_hf import download_model_files, process_model_files, upload_model_files_to_hf +from nwp.assets.ecmwf.mars import download_mars_file diff --git a/nwp/assets/dwd/archive_to_hf.py b/nwp/assets/dwd/archive_to_hf.py index 56178f1..55f2101 100644 --- a/nwp/assets/dwd/archive_to_hf.py +++ b/nwp/assets/dwd/archive_to_hf.py @@ -1,15 +1,14 @@ -from dagster import asset # import the `dagster` library -from nwp.assets.dwd.common import IconConfig - import os +import shutil from glob import glob import xarray as xr import zarr +from dagster import asset # import the `dagster` library from huggingface_hub import HfApi from ocf_blosc2 import Blosc2 -import shutil +from nwp.assets.dwd.common import IconConfig from nwp.assets.dwd.consts import ( EU_PRESSURE_LEVELS, EU_VAR2D_LIST, @@ -34,10 +33,7 @@ def does_files_exist(config, now_datetime): f"{now_datetime.year}{str(now_datetime.month).zfill(2)}{str(now_datetime.day).zfill(2)}" f"_{str(now_datetime.hour).zfill(2)}.zarr.zip" # Check if the current run exists or not - if path_in_repo in existing_files: - return True - else: - return False + return path_in_repo in existing_files @asset diff --git a/nwp/assets/dwd/common.py b/nwp/assets/dwd/common.py index 529f406..06b428a 100644 --- a/nwp/assets/dwd/common.py +++ b/nwp/assets/dwd/common.py @@ -1,6 +1,6 @@ +from datetime import timedelta + from dagster import Config -from datetime import datetime, timedelta -import os class IconConfig(Config): diff --git a/nwp/assets/dwd/utils.py b/nwp/assets/dwd/utils.py index c816493..c481a5e 100644 --- a/nwp/assets/dwd/utils.py +++ b/nwp/assets/dwd/utils.py @@ -1,4 +1,4 @@ -"""Utilities for downloading the DWD ICON models""" +"""Utilities for downloading the DWD ICON models.""" import bz2 import os from datetime import datetime, timedelta @@ -9,13 +9,14 @@ def get_run(run: str, delay: int = 0): - """ - Get run name + """Get run name. Args: + ---- run: Run number Returns: + ------- Run date and run number """ now = datetime.now() - timedelta(days=delay) @@ -50,12 +51,10 @@ def find_file_name( if (vars_2d is None) and (vars_3d is None): raise ValueError("You need to specify at least one 2D or one 3D variable") - if vars_2d is not None: - if type(vars_2d) is not list: - vars_2d = [vars_2d] - if vars_3d is not None: - if type(vars_3d) is not list: - vars_3d = [vars_3d] + if vars_2d is not None and type(vars_2d) is not list: + vars_2d = [vars_2d] + if vars_3d is not None and type(vars_3d) is not list: + vars_3d = [vars_3d] urls = [] for f_time in f_times: @@ -84,13 +83,9 @@ def find_file_name( def download_extract_files(urls: list, folder: str): """Given a list of urls download and bunzip2 them. - Return a list of the path of the extracted files + Return a list of the path of the extracted files. """ - - if type(urls) is list: - urls_list = urls - else: - urls_list = [urls] + urls_list = urls if type(urls) is list else [urls] # We only parallelize if we have a number of files # larger than the cpu count @@ -108,13 +103,14 @@ def download_extract_files(urls: list, folder: str): def download_extract_url(url_and_folder): - """ - Download and extract url if file isn't already downloaded + """Download and extract url if file isn't already downloaded. Args: + ---- url_and_folder: Tuple of URL and folder Returns: + ------- """ url, folder = url_and_folder diff --git a/nwp/assets/ecmwf/consts.py b/nwp/assets/ecmwf/consts.py deleted file mode 100644 index e18eaab..0000000 --- a/nwp/assets/ecmwf/consts.py +++ /dev/null @@ -1,4 +0,0 @@ -GLOBAL_TIME_LIST=['00:00:00','12:00:00'] -CLASS='od' -STREAM='oper' -TYPE='fc' diff --git a/nwp/assets/ecmwf/mars.py b/nwp/assets/ecmwf/mars.py new file mode 100644 index 0000000..65a70bf --- /dev/null +++ b/nwp/assets/ecmwf/mars.py @@ -0,0 +1,25 @@ + +from dagster import Output, asset +from ecmwfapi import ECMWFService + +server = ECMWFService("mars") + + +@asset +def download_mars_file(): + server.execute( + req={ + "class": "od", + "date": "20230815/to/20230816", + "expver": "1", + "levtype": "sfc", + "param": "28.228/49.128/123.128/165.128/166.128/239.228/246.228/247.228", + "step": "0/t0/48/by/1", + "stream": "oper", + "time": "00:00:00,12:00:00", + "type": "fc", + }, + target="20230815.grib" + ) + + return Output(None, metadata={"filepath": "20230815.grib"}) diff --git a/nwp/jobs.py b/nwp/jobs.py index 2b93765..e9b16be 100644 --- a/nwp/jobs.py +++ b/nwp/jobs.py @@ -1,6 +1,6 @@ -from dagster import job, schedule, ScheduleEvaluationContext, RunRequest, RunConfig, materialize, ScheduleDefinition, op +from dagster import AssetSelection, ScheduleDefinition, define_asset_job, schedule + from nwp.assets.dwd.common import IconConfig -from dagster import AssetSelection, define_asset_job base_path = "/mnt/storage_b/data/ocf/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/DWD" diff --git a/pyproject.toml b/pyproject.toml index e99d95e..72150fd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,9 +18,11 @@ authors = [ ] classifiers = ["Programming Language :: Python :: 3"] dependencies = [ + "ecmwf-api-client == 1.6.3", "dagit == 1.4.11", "dagster == 1.4.11", "dagster-cloud == 1.4.11", + "huggingface-hub == 0.16.4", "numpy == 1.24.2", "ocf-blosc2 == 0.0.3", "pathlib == 1.0.1", @@ -40,4 +42,28 @@ dev = [ [tool.setuptools.packages.find] exclude = ["*_tests"] +# Ruff configuration +# * See https://beta.ruff.rs/docs/ +[tool.ruff] +select = [ + "F", # pyflakes + "E", # pycodestyle + "W", # whitespace and newlines + "I", # isort + "UP", # modernize + "ANN", # flake8 type annotations + "S", # flake8 bandit + "B", # flake8 bugbear + "C4", # flake8 comprehensions + "T20", # flake8 print + "SIM", # flake8 simplify + "ARG", # flake8 unused arguments + "D", # pydocstyle +] +line-length = 100 +ignore = ["D203", "D213", "ANN101"] +exclude = ["__init__.py"] + +[tool.ruff.per-file-ignores] +"test*" = ["D", "ANN"] diff --git a/sat/assets.py b/sat/assets.py index 90b7884..fd40910 100644 --- a/sat/assets.py +++ b/sat/assets.py @@ -1,8 +1,4 @@ -import json -import pandas as pd -import requests -from dagster import AssetExecutionContext, MetadataValue, asset