Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add simple ecmwf asset #4

Merged
merged 7 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
name: Python CI

on:
push:
branches: []
paths-ignore:
- 'README.md'
pull_request:
branches: []
paths-ignore:
- 'README.md'

# Specify concurrency such that only one workflow can run at a time
# * Different workflow files are not affected
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
# Define a dependencies job that runs on all branches and PRs
# * Installs dependencies and caches them
build-venv:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v3

# Restore cached virtualenv, if available
# * The pyproject.toml hash is part of the cache key, invalidating
# the cache if the file changes
- name: Restore cached virtualenv
id: restore-cache
uses: actions/cache/restore@v3
with:
path: ./venv
key: ${{ runner.os }}-venv-${{ hashFiles('**/pyproject.toml') }}

# Should mirror the build-venv stage in the Containerfile
- name: Build venv
run: |
apt -qq update && apt -qq install -y build-essential
python -m venv ./venv
./venv/bin/pip install --upgrade -q pip wheel setuptools
if: steps.restore-cache.outputs.cache-hit != 'true'

# Should mirror the build-reqs stage in the Containerfile
# * Except this installs the dev dependencies and binaries as well
- name: Install all dependencies
run: |
./venv/bin/pip install -q .[dev]
if: steps.restore-cache.outputs.cache-hit != 'true'

# Cache the virtualenv for future runs
- name: Cache virtualenv
uses: actions/cache/save@v3
with:
path: ./venv
key: ${{ steps.restore-cache.outputs.cache-primary-key }}
if: steps.restore-cache.outputs.cache-hit != 'true'

# Define a unittest job that runs on all branches and PRs
test-unit:
runs-on: ubuntu-latest
needs: build-venv

steps:
- name: Checkout repository
uses: actions/checkout@v3

# Restore cached virtualenv
- name: Restore cached virtualenv
uses: actions/cache/restore@v3
with:
path: ./venv
key: ${{ runner.os }}-venv-${{ hashFiles('**/pyproject.toml') }}

# Run unittests
# * Produce JUnit XML report
- name: Run unit tests
run: ./venv/bin/python -m pytest --junitxml=ut-report.xml dags_tests

# Create test summary to be visualised on the job summary screen on GitHub
# * Runs even if previous steps fail
- name: Create test summary
uses: test-summary/action@v2
with:
paths: "*t-report.xml"
show: "fail, skip"
if: always()

10 changes: 10 additions & 0 deletions dags_tests/compile_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from dagster import Definitions, load_assets_from_modules
from nwp import assets, jobs

def test_compiles():
all_assets = load_assets_from_modules([assets])
defs = Definitions(
assets=all_assets,
jobs=jobs.asset_jobs,
schedules=jobs.schedule_jobs
)
Binary file modified nwp/__pycache__/__init__.cpython-310.pyc
Binary file not shown.
Binary file modified nwp/__pycache__/jobs.cpython-310.pyc
Binary file not shown.
1 change: 1 addition & 0 deletions nwp/assets/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from nwp.assets.dwd.archive_to_hf import download_model_files, process_model_files, upload_model_files_to_hf
from nwp.assets.ecmwf.mars import download_mars_file
12 changes: 4 additions & 8 deletions nwp/assets/dwd/archive_to_hf.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
from dagster import asset # import the `dagster` library
from nwp.assets.dwd.common import IconConfig

import os
import shutil
from glob import glob

import xarray as xr
import zarr
from dagster import asset # import the `dagster` library
from huggingface_hub import HfApi
from ocf_blosc2 import Blosc2
import shutil

from nwp.assets.dwd.common import IconConfig
from nwp.assets.dwd.consts import (
EU_PRESSURE_LEVELS,
EU_VAR2D_LIST,
Expand All @@ -34,10 +33,7 @@ def does_files_exist(config, now_datetime):
f"{now_datetime.year}{str(now_datetime.month).zfill(2)}{str(now_datetime.day).zfill(2)}"
f"_{str(now_datetime.hour).zfill(2)}.zarr.zip"
# Check if the current run exists or not
if path_in_repo in existing_files:
return True
else:
return False
return path_in_repo in existing_files


@asset
Expand Down
4 changes: 2 additions & 2 deletions nwp/assets/dwd/common.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from datetime import timedelta

from dagster import Config
from datetime import datetime, timedelta
import os


class IconConfig(Config):
Expand Down
30 changes: 13 additions & 17 deletions nwp/assets/dwd/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Utilities for downloading the DWD ICON models"""
"""Utilities for downloading the DWD ICON models."""
import bz2
import os
from datetime import datetime, timedelta
Expand All @@ -9,13 +9,14 @@


def get_run(run: str, delay: int = 0):
"""
Get run name
"""Get run name.

Args:
----
run: Run number

Returns:
-------
Run date and run number
"""
now = datetime.now() - timedelta(days=delay)
Expand Down Expand Up @@ -50,12 +51,10 @@ def find_file_name(
if (vars_2d is None) and (vars_3d is None):
raise ValueError("You need to specify at least one 2D or one 3D variable")

if vars_2d is not None:
if type(vars_2d) is not list:
vars_2d = [vars_2d]
if vars_3d is not None:
if type(vars_3d) is not list:
vars_3d = [vars_3d]
if vars_2d is not None and type(vars_2d) is not list:
vars_2d = [vars_2d]
if vars_3d is not None and type(vars_3d) is not list:
vars_3d = [vars_3d]

urls = []
for f_time in f_times:
Expand Down Expand Up @@ -84,13 +83,9 @@ def find_file_name(

def download_extract_files(urls: list, folder: str):
"""Given a list of urls download and bunzip2 them.
Return a list of the path of the extracted files
Return a list of the path of the extracted files.
"""

if type(urls) is list:
urls_list = urls
else:
urls_list = [urls]
urls_list = urls if type(urls) is list else [urls]

# We only parallelize if we have a number of files
# larger than the cpu count
Expand All @@ -108,13 +103,14 @@ def download_extract_files(urls: list, folder: str):


def download_extract_url(url_and_folder):
"""
Download and extract url if file isn't already downloaded
"""Download and extract url if file isn't already downloaded.

Args:
----
url_and_folder: Tuple of URL and folder

Returns:
-------

"""
url, folder = url_and_folder
Expand Down
4 changes: 0 additions & 4 deletions nwp/assets/ecmwf/consts.py

This file was deleted.

25 changes: 25 additions & 0 deletions nwp/assets/ecmwf/mars.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@

from dagster import Output, asset
from ecmwfapi import ECMWFService

server = ECMWFService("mars")


@asset
def download_mars_file():
server.execute(
req={
"class": "od",
"date": "20230815/to/20230816",
"expver": "1",
"levtype": "sfc",
"param": "28.228/49.128/123.128/165.128/166.128/239.228/246.228/247.228",
"step": "0/t0/48/by/1",
"stream": "oper",
"time": "00:00:00,12:00:00",
"type": "fc",
},
target="20230815.grib"
)

return Output(None, metadata={"filepath": "20230815.grib"})
4 changes: 2 additions & 2 deletions nwp/jobs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from dagster import job, schedule, ScheduleEvaluationContext, RunRequest, RunConfig, materialize, ScheduleDefinition, op
from dagster import AssetSelection, ScheduleDefinition, define_asset_job, schedule

from nwp.assets.dwd.common import IconConfig
from dagster import AssetSelection, define_asset_job

base_path = "/mnt/storage_b/data/ocf/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/DWD"

Expand Down
26 changes: 26 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ authors = [
]
classifiers = ["Programming Language :: Python :: 3"]
dependencies = [
"ecmwf-api-client == 1.6.3",
"dagit == 1.4.11",
"dagster == 1.4.11",
"dagster-cloud == 1.4.11",
"huggingface-hub == 0.16.4",
"numpy == 1.24.2",
"ocf-blosc2 == 0.0.3",
"pathlib == 1.0.1",
Expand All @@ -40,4 +42,28 @@ dev = [
[tool.setuptools.packages.find]
exclude = ["*_tests"]

# Ruff configuration
# * See https://beta.ruff.rs/docs/
[tool.ruff]
select = [
"F", # pyflakes
"E", # pycodestyle
"W", # whitespace and newlines
"I", # isort
"UP", # modernize
"ANN", # flake8 type annotations
"S", # flake8 bandit
"B", # flake8 bugbear
"C4", # flake8 comprehensions
"T20", # flake8 print
"SIM", # flake8 simplify
"ARG", # flake8 unused arguments
"D", # pydocstyle
]
line-length = 100
ignore = ["D203", "D213", "ANN101"]
exclude = ["__init__.py"]

[tool.ruff.per-file-ignores]
"test*" = ["D", "ANN"]

4 changes: 0 additions & 4 deletions sat/assets.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
import json

import pandas as pd
import requests

from dagster import AssetExecutionContext, MetadataValue, asset