Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MDA8 #1224

Merged
merged 22 commits into from
Jul 2, 2024
Merged

MDA8 #1224

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,6 @@ sorted_out

# Vim temp files
*.swp


pytest.ini
23 changes: 19 additions & 4 deletions pyaerocom/colocation/colocator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import os
import traceback
import warnings
from collections import defaultdict
from datetime import datetime
from typing import Any, Callable

Expand All @@ -30,6 +31,8 @@
from pyaerocom.io import ReadCAMS2_83, ReadGridded, ReadUngridded
from pyaerocom.io.helpers import get_all_supported_ids_ungridded
from pyaerocom.io.mscw_ctm.reader import ReadMscwCtm
from pyaerocom.stats.mda8.const import MDA_VARS
from pyaerocom.stats.mda8.mda8 import mda8_colocated_data

from .colocated_data import ColocatedData
from .colocation_3d import ColocatedDataLists, colocate_vertical_profile_gridded
Expand Down Expand Up @@ -360,7 +363,7 @@ def run(self, var_list: list = None):
dictionaries comprising key / value pairs of obs variables and
associated instances of :class:`ColocatedData`.
"""
data_out = {}
data_out = defaultdict(lambda: dict())
# ToDo: see if the following could be solved via custom context manager
try:
vars_to_process = self.prepare_run(var_list)
Expand All @@ -378,9 +381,21 @@ def run(self, var_list: list = None):
coldata = self._run_helper(
mod_var, obs_var
) # note this can be ColocatedData or ColocatedDataLists
if mod_var not in data_out:
data_out[mod_var] = {}
data_out[mod_var][obs_var] = coldata

if obs_var in MDA_VARS:
mda8 = None
try:
mda8 = mda8_colocated_data(
coldata, obs_var=f"{obs_var}mda8", mod_var=f"{mod_var}mda8"
)
except ValueError:
logger.warning(
"Tried calculating mda8 for [%s, %s], but failed.", obs_var, mod_var
)
finally:
data_out[f"{mod_var}mda8"][f"{obs_var}mda8"] = mda8

self._processing_status.append([mod_var, obs_var, 1])
except Exception:
msg = f"Failed to perform analysis: {traceback.format_exc()}\n"
Expand All @@ -397,7 +412,7 @@ def run(self, var_list: list = None):
self._print_processing_status()
if self.colocation_setup.keep_data:
self.data = data_out
return data_out
return dict(data_out)

def get_nc_files_in_coldatadir(self):
"""
Expand Down
Empty file.
2 changes: 2 additions & 0 deletions pyaerocom/stats/mda8/const.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Variable names for which mda8 values are to be calculated.
MDA_VARS = ("conco3", "vmro3")
90 changes: 90 additions & 0 deletions pyaerocom/stats/mda8/mda8.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import numpy as np
import xarray as xr

from pyaerocom.colocation.colocated_data import ColocatedData


def min_periods_max(x: np.ndarray, /, min_periods=1) -> float:
"""Calculates the max of a 1-dimensional array, returning
nan if not a minimum count of valid values exist.

:param x: 1-dimensional ndarray.
:param min_periods: minimum required non-nan values, defaults to 1
:return: A single value, which is either nan or a float.
"""
if x.ndim != 1:
raise ValueError(f"Unexpected number of dimensions. Got {x.ndim}, expected 1.")

Check warning on line 16 in pyaerocom/stats/mda8/mda8.py

View check run for this annotation

Codecov / codecov/patch

pyaerocom/stats/mda8/mda8.py#L16

Added line #L16 was not covered by tests

length = np.sum(~np.isnan(x))
if length < min_periods:
return np.nan

return np.nanmax(x)


def mda8_colocated_data(coldat: ColocatedData, /, obs_var: str, mod_var: str) -> ColocatedData:
"""Applies the mda8 calculation to a colocated data object,
returning the new colocated data object.

:param data: The colocated data object.
:return: Colocated data object containing
"""
if not isinstance(coldat, ColocatedData):
raise ValueError(f"Unexpected type {type(coldat)}. Expected ColocatedData")

Check warning on line 33 in pyaerocom/stats/mda8/mda8.py

View check run for this annotation

Codecov / codecov/patch

pyaerocom/stats/mda8/mda8.py#L33

Added line #L33 was not covered by tests

if coldat.ts_type != "hourly":
raise ValueError(f"Expected hourly timeseries. Got {coldat.ts_type}.")

# TODO: Currently order of dims matter in the implementation, so this check is
# stricter than it probably should be.
if coldat.dims != ("data_source", "time", "station_name"):
raise ValueError(

Check warning on line 41 in pyaerocom/stats/mda8/mda8.py

View check run for this annotation

Codecov / codecov/patch

pyaerocom/stats/mda8/mda8.py#L41

Added line #L41 was not covered by tests
f"Unexpected dimensions. Got {coldat.dims}, expected ['data_source', 'time', 'station_name']."
)

cd = ColocatedData(_calc_mda8(coldat.data))
cd.data.attrs["var_name"] = [obs_var, mod_var]
return cd


def _calc_mda8(data: xr.DataArray) -> xr.DataArray:
"""Calculates the daily max 8h average for an array:

:param data: The DataArray for which to calculate the mda8. Input
should be a DataArray with dimensions ["data_source", "time", "station_name"]
(ie. the format of ColocatedData.data) representing hourly data.
:return: Equivalently structured DataArray, resampled along the "time"
dimension.

Note:
-----
The calculation for mda8 is defined as follows:
> Eight hours values: 75 % of values (i.e. 6 hours)

> Maximum daily 8-hour mean: 75 % of the hourly running eight hour
averages (i.e. 18 eight hour averages per
day)

> The maximum daily eight hour mean concentration will be selected by examining
> eight hour running averages, calculated from hourly data and updated each hour.
> Each eight hour average so calculated will be assigned to the day on which it
> ends i.e. the first calculation period for any one day will be the period from
> 17:00 on the previous day to 01:00 on that day; the last calculation period for
> any one day will be the period from 16:00 to 24:00 on that day.
"""
mda8 = _daily_max(_rolling_average_8hr(data))
mda8.attrs["ts_type"] = "daily"
return mda8


def _rolling_average_8hr(arr: xr.DataArray) -> xr.DataArray:
return arr.rolling(time=8, min_periods=6).mean()


def _daily_max(arr: xr.DataArray) -> xr.DataArray:
# TODO: Base is deprecated, and using offset="1h" is the proper way to do this.
# However this currently breaks the old-dependencies test in CI. Should be
# changed in the future.
return arr.resample(time="24H", base=1).reduce(
lambda x, axis: np.apply_along_axis(min_periods_max, 1, x, min_periods=18)
)
Empty file added tests/stats/mda8/__init__.py
Empty file.
190 changes: 190 additions & 0 deletions tests/stats/mda8/test_mda8.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
import numpy as np
import pytest
import xarray as xr

from pyaerocom.colocation.colocated_data import ColocatedData
from pyaerocom.colocation.colocation_3d import ColocatedDataLists
from pyaerocom.stats.mda8.mda8 import (
_calc_mda8,
_daily_max,
_rolling_average_8hr,
mda8_colocated_data,
)
from tests.fixtures.collocated_data import coldata


@pytest.fixture
def test_data(time, values) -> xr.DataArray:
return xr.DataArray(
[[[x] for x in values]],
dims=["data_source", "time", "station_name"],
coords={"time": time},
)


@pytest.mark.parametrize(
"time,values,exp_mda8",
(
pytest.param(
xr.date_range(start="2024-01-01 01:00", periods=49, freq="1h"),
[0] * 49,
[0, 0, np.nan],
id="zeros",
),
pytest.param(
xr.date_range(start="2024-01-01 01:00", periods=49, freq="1h"),
np.linspace(start=1, stop=49, num=49),
[20.5, 44.5, np.nan],
id="incrementing-by-1",
),
pytest.param(
xr.date_range(start="2024-01-01 13:00:00", periods=49, freq="1h"),
np.concatenate( # Sequence 1-7, 3x nan, 11-14, 3x nan, 28-49
(
np.arange(start=1, stop=8),
[np.nan] * 3,
np.arange(start=11, stop=25),
[np.nan] * 3,
np.arange(start=28, stop=50),
)
),
[np.nan] * 3,
id="with-nans",
),
),
)
def test_calc_mda8(test_data, exp_mda8):
mda8 = _calc_mda8(test_data)

assert mda8.shape[1] == len(exp_mda8)

assert all(mda8[0, :, 0] == pytest.approx(exp_mda8, abs=0, nan_ok=True))


def test_calc_mda8_with_gap():
arr1 = xr.DataArray(
[[[x] for x in np.linspace(start=1, stop=50, num=50)]],
dims=["data_source", "time", "station_name"],
coords={"time": xr.date_range(start="2024-01-01 01:00", periods=50, freq="1h")},
)

arr2 = xr.DataArray(
[[[x] for x in np.linspace(start=1, stop=50, num=50)]],
dims=["data_source", "time", "station_name"],
coords={"time": xr.date_range(start="2024-01-04 01:00", periods=50, freq="1h")},
)

arr = xr.concat((arr1, arr2), dim="time")

mda8 = _calc_mda8(arr)

assert mda8.shape == (1, 6, 1)
pytest.approx(mda8[0, :, 0], [20.5, 44.5, np.nan, 41.25, 44.5, np.nan], abs=10 * 10**-5)


@pytest.mark.parametrize("coldataset", ("fake_3d_hr",))
def test_coldata_to_mda8(coldata):
mda8 = mda8_colocated_data(coldata, obs_var="vmro3mda8", mod_var="vmro3mda8")

assert isinstance(mda8, ColocatedData)
assert mda8.metadata["ts_type"] == "daily"
assert mda8.metadata["var_name"] == ["vmro3mda8", "vmro3mda8"]
assert mda8.shape == (2, 9, 1)

assert mda8.data.values[0, :, 0] == pytest.approx(
[
np.nan,
np.nan,
np.nan,
1.18741556,
1.18777241,
1.18869106,
1.18879322,
1.18807846,
1.18700801,
],
abs=10**-5,
nan_ok=True,
)

assert mda8.data.values[1, :, 0] == pytest.approx(
[
np.nan,
1.57327333,
1.28884431,
1.28741556,
1.28777241,
1.28869106,
1.28879322,
1.28807846,
1.28700801,
],
abs=10**-5,
nan_ok=True,
)


@pytest.mark.parametrize(
"time,values,exp_ravg",
(
(
xr.date_range(start="2024-01-01 01:00", periods=48, freq="1h"),
[0, 1, 2, 3, 4, 5, 6, 7] * 6,
[np.nan] * 5 + [2.5, 3] + [3.5] * 41,
),
(
xr.date_range(start="2024-01-01 01:00", periods=24, freq="1h"),
[np.nan, 1, 2, 3, 4, 5, 6, 7] * 3,
[np.nan] * 6 + [3.5] + [4] * 17,
),
(
xr.date_range(start="2024-01-01 01:00", periods=24, freq="1h"),
[np.nan, np.nan, 2, 3, 4, 5, 6, 7] * 3,
[np.nan] * 7 + [4.5] * 17,
),
(
xr.date_range(start="2024-01-01 01:00", periods=24, freq="1h"),
[np.nan, np.nan, np.nan, 3, 4, 5, 6, 7] * 3,
[np.nan] * 24,
),
),
)
def test_rollingaverage(test_data, exp_ravg):
"""
Test of rolling average calculation
"""
ravg = _rolling_average_8hr(test_data)

assert all(ravg[0, :, 0] == pytest.approx(exp_ravg, abs=0, nan_ok=True))


@pytest.mark.parametrize(
"time,values,exp_daily_max",
(
(
xr.date_range(start="2024-01-01 01:00", periods=48, freq="1h"),
np.linspace(1, 48, num=48),
[24, 48],
),
(
xr.date_range(start="2024-01-01 01:00", periods=24, freq="1h"),
[np.nan] * 6 + list(range(7, 25)),
[24],
),
(
xr.date_range(start="2024-01-01 01:00", periods=24, freq="1h"),
[np.nan] * 7 + list(range(8, 25)),
[np.nan],
),
),
)
def test_daily_max(test_data, exp_daily_max):
"""
Tests the daily max calculation. Note that "the first calculation period
for any one day will be the period from 17:00 on the previous day to 01:00
on that day; the last calculation period for any one day will be the period
from 16:00 to 24:00 on that day
"""
dmax = _daily_max(test_data)

assert all(dmax[0, :, 0] == pytest.approx(exp_daily_max, abs=0, nan_ok=True))