Skip to content

Commit

Permalink
Merge pull request #69 from Deltares/feature/DEI-120-add-statistics-t…
Browse files Browse the repository at this point in the history
…ime-aggregation-rule

Feature/dei 120 add statistics time aggregation rule
  • Loading branch information
mKlapwijk authored Oct 17, 2023
2 parents b5c2436 + 0014aa0 commit 552be7f
Show file tree
Hide file tree
Showing 15 changed files with 277 additions and 21 deletions.
3 changes: 2 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@
"editor.codeActionsOnSave": {
"source.organizeImports": true,
},
"editor.defaultFormatter": "ms-python.black-formatter",
},
"python.formatting.provider": "black",
"python.formatting.provider": "none",
"python.formatting.blackArgs": [
"--line-length=88"
],
Expand Down
36 changes: 25 additions & 11 deletions decoimpact/business/entities/rules/time_aggregation_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
# from itertools import groupby
from typing import List

import xarray as _xr
import numpy as _np
import xarray as _xr
from xarray.core.resample import DataArrayResample

from decoimpact.business.entities.rules.i_array_based_rule import IArrayBasedRule
Expand All @@ -33,12 +33,14 @@ def __init__(
name: str,
input_variable_names: List[str],
operation_type: TimeOperationType,
operation_parameter: float = 0,
time_scale: str = "year",
output_variable_name: str = "output",
description: str = "",
):
super().__init__(name, input_variable_names, output_variable_name, description)
self._operation_type = operation_type
self._operation_parameter = operation_parameter
self._time_scale = time_scale.lower()
self._time_scale_mapping = {"month": "M", "year": "Y"}

Expand All @@ -47,6 +49,11 @@ def operation_type(self):
"""Operation type property"""
return self._operation_type

@property
def operation_parameter(self):
"""Operation parameter property"""
return self._operation_parameter

@property
def time_scale(self):
"""Time scale property"""
Expand Down Expand Up @@ -79,7 +86,6 @@ def validate(self, logger: ILogger) -> bool:
return valid

def execute(self, value_array: _xr.DataArray, logger: ILogger) -> _xr.DataArray:

"""Aggregates the values for the specified start and end date
Args:
Expand Down Expand Up @@ -136,7 +142,7 @@ def _perform_operation(self, aggregated_values: DataArrayResample) -> _xr.DataAr
period_operations = [
TimeOperationType.COUNT_PERIODS,
TimeOperationType.MAX_DURATION_PERIODS,
TimeOperationType.AVG_DURATION_PERIODS
TimeOperationType.AVG_DURATION_PERIODS,
]

if self._operation_type is TimeOperationType.ADD:
Expand All @@ -157,6 +163,14 @@ def _perform_operation(self, aggregated_values: DataArrayResample) -> _xr.DataAr
elif self._operation_type in period_operations:
result = aggregated_values.reduce(self.analyze_groups, dim="time")

elif self._operation_type is TimeOperationType.STDEV:
result = aggregated_values.std()

elif self._operation_type is TimeOperationType.PERCENTILE:
result = aggregated_values.quantile(
self._operation_parameter / 100
).drop_vars("quantile")

else:
raise NotImplementedError(
f"The operation type '{self._operation_type}' "
Expand Down Expand Up @@ -190,15 +204,15 @@ def count_groups(self, elem):

def duration_groups(self, elem):
"""
Create an array that cumulative sums the values of the groups in the array,
but restarts when a 0 occurs. For example: [0, 1, 1, 0, 1, 1, 1, 0, 1]
This function will return: [0, 1, 2, 0, 1, 2, 3, 0, 1, 2, 0, 1]
Create an array that cumulative sums the values of the groups in the array,
but restarts when a 0 occurs. For example: [0, 1, 1, 0, 1, 1, 1, 0, 1]
This function will return: [0, 1, 2, 0, 1, 2, 3, 0, 1, 2, 0, 1]
Args:
elem (List): the data array in N-dimensions
Args:
elem (List): the data array in N-dimensions
Returns:
List: List with the duration of the periods
Returns:
List: List with the duration of the periods
"""
# Function to create a cumsum over the groups (where the elements in elem are 1)
cumsum_groups = _np.frompyfunc(lambda a, b: a + b if b == 1 else 0, 2, 1)
Expand Down Expand Up @@ -248,7 +262,7 @@ def analyze_groups(self, elem, axis, **kwargs):
period,
group_count,
out=_np.zeros_like(period),
where=group_count != 0
where=group_count != 0,
)

# in case of multiple dimensions:
Expand Down
3 changes: 2 additions & 1 deletion decoimpact/business/workflow/model_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def _create_rule(rule_data: IRuleData) -> IRule:
[rule_data.input_variable],
rule_data.multipliers,
rule_data.output_variable,
rule_data.date_range
rule_data.date_range,
)

if isinstance(rule_data, ILayerFilterRuleData):
Expand All @@ -107,6 +107,7 @@ def _create_rule(rule_data: IRuleData) -> IRule:
rule_data.name,
[rule_data.input_variable],
rule_data.operation,
rule_data.operation_parameter,
rule_data.output_variable,
rule_data.time_scale,
)
Expand Down
5 changes: 5 additions & 0 deletions decoimpact/data/api/i_time_aggregation_rule_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ def input_variable(self) -> str:
def operation(self) -> TimeOperationType:
"""Operation type"""

@property
@abstractmethod
def operation_parameter(self) -> float:
"""Operation parameter"""

@property
@abstractmethod
def time_scale(self) -> str:
Expand Down
2 changes: 2 additions & 0 deletions decoimpact/data/api/time_operation_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,5 @@ class TimeOperationType(IntEnum):
COUNT_PERIODS = 6
MAX_DURATION_PERIODS = 7
AVG_DURATION_PERIODS = 8
STDEV = 9
PERCENTILE = 10
7 changes: 7 additions & 0 deletions decoimpact/data/entities/time_aggregation_rule_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def __init__(
self,
name: str,
operation: TimeOperationType,
operation_parameter: float,
input_variable: str,
time_scale: str = "year",
output_variable: str = "output",
Expand All @@ -32,6 +33,7 @@ def __init__(
super().__init__(name, output_variable, description)
self._input_variable = input_variable
self._operation = operation
self._operation_parameter = operation_parameter
self._time_scale = time_scale

@property
Expand All @@ -44,6 +46,11 @@ def operation(self) -> TimeOperationType:
"""Operation type"""
return self._operation

@property
def operation_parameter(self) -> float:
"""Operation parameter"""
return self._operation_parameter

@property
def time_scale(self) -> str:
"""Time scale type"""
Expand Down
35 changes: 34 additions & 1 deletion decoimpact/data/parsers/parser_time_aggregation_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,52 @@ def parse_dict(self, dictionary: Dict[str, Any], logger: ILogger) -> IRuleData:
Returns:
RuleBase: Rule based on the provided data
"""
# get elements
name = get_dict_element("name", dictionary)
input_variable_name = get_dict_element("input_variable", dictionary)
operation = get_dict_element("operation", dictionary)
time_scale = get_dict_element("time_scale", dictionary)
operation_parameter = None

# if operation contains percentile,
# extract percentile value as operation_parameter from operation:
if str(operation)[:10] == "PERCENTILE":
try:
operation_parameter = float(str(operation)[11:-1])
except ValueError as exc:
message = (
"Operation percentile is missing valid value like 'percentile(10)'"
)
raise ValueError(message) from exc
operation = "PERCENTILE"

# validate operation
match_operation = [o for o in TimeOperationType if o.name == operation]
operation_value = next(iter(match_operation), None)

# validate operation_value (percentile(n); n = operation_value)
if not operation_value:
message = f"Operation is not of a predefined type. Should be in: \
{[o.name for o in TimeOperationType]}. Received: {operation}"
raise ValueError(message)

# test if operation_parameter is within expected limits:
if operation_value == TimeOperationType.PERCENTILE:
if (
operation_parameter is None
or operation_parameter < 0
or operation_parameter > 100
):
message = "Operation percentile should be a number between 0 and 100."
raise ValueError(message)

output_variable_name = get_dict_element("output_variable", dictionary)

return TimeAggregationRuleData(
name, operation_value, input_variable_name, output_variable_name, time_scale
name,
operation_value,
operation_parameter,
input_variable_name,
output_variable_name,
time_scale,
)
2 changes: 1 addition & 1 deletion docs/tutorials/input.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ FORMAT
```

The time aggregation rule rule allows for calculating a statistical summary over the time axes of 3D and 2D variables. This could be used for calculating the maximum value over a year (e.g. for water level) or the minimum value over a month (e.g. oxygen concentration). The rule operates both on 3D variables and 2D variables as long as they have a time axes and returns a 3D or 2D result depending on input with the statistic calculated for a new time axis (e.g, year or month).
Operations available: Add, Average, Median, Min, Max and count_periods
Operations available: Add, Average, Median, Min, Max, count_periods, Stdev and Percentile(n). When using percentile, add a number for the nth percentile with brackets like this: percentile(10).

Time aggregation available: Year, Month

Expand Down
60 changes: 55 additions & 5 deletions tests/business/entities/rules/test_time_aggregation_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,16 +331,68 @@ def test_execute_value_array_aggregate_time_monthly_median():
)


def test_execute_value_array_aggregate_time_monthly_stdev():
"""Test aggregate input_variable_names of a TimeAggregationRule
(STDEV, monthly)"""

# create test set
logger = Mock(ILogger)
rule = TimeAggregationRule(
name="test",
time_scale="month",
input_variable_names=["foo"],
operation_type=TimeOperationType.STDEV,
)

time_aggregation = rule.execute(value_array_monthly, logger)
result_data = [0.0, 0.25, 0.05]
result_array = _xr.DataArray(
result_data, coords=[result_time_monthly], dims=["time_month"]
)

# Assert
assert (
_xr.testing.assert_allclose(time_aggregation, result_array, atol=1e-11) is None
)


def test_execute_value_array_aggregate_time_monthly_percentile():
"""Test aggregate input_variable_names of a TimeAggregationRule
(PERCENTILE, monthly)"""

# create test set
logger = Mock(ILogger)
rule = TimeAggregationRule(
name="test",
time_scale="month",
input_variable_names=["foo"],
operation_type=TimeOperationType.PERCENTILE,
operation_parameter=10,
)

time_aggregation = rule.execute(value_array_monthly, logger)
result_data = [0.1, 0.25, 0.21]
result_array = _xr.DataArray(
result_data, coords=[result_time_monthly], dims=["time_month"]
)

# Assert
assert (
_xr.testing.assert_allclose(time_aggregation, result_array, atol=1e-11) is None
)


def test_operation_type_not_implemented():
"""Test that the time aggregation rule gives an error if no operation_type is given"""
"""Test that the time aggregation rule gives an error
if no operation_type is given"""

# create test set
logger = Mock(ILogger)
rule = TimeAggregationRule(
name="test",
time_scale="month",
input_variable_names=["foo"],
operation_type="test"
operation_type="test",
)

with pytest.raises(NotImplementedError) as exc_info:
Expand All @@ -349,7 +401,5 @@ def test_operation_type_not_implemented():
exception_raised = exc_info.value

# Assert
expected_message = (
"The operation type 'test' is currently not supported"
)
expected_message = "The operation type 'test' is currently not supported"
assert exception_raised.args[0] == expected_message
Binary file modified tests/data/entities/test_data_access_layer_data/results.nc
Binary file not shown.
2 changes: 1 addition & 1 deletion tests/data/entities/test_time_aggregation_rule_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def test_time_aggregation_rule_data_creation_logic():

# Act
data = TimeAggregationRuleData(
"test_name", TimeOperationType.MIN, "input", "output", "description"
"test_name", TimeOperationType.MIN, None, "input", "output", "description"
)

# Assert
Expand Down
1 change: 1 addition & 0 deletions tests/data/parsers/test_parser_combine_results_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def test_parse_dict_to_rule_data_logic():
"name": "testname",
"input_variables": ["foo", "bar"],
"operation": "Multiply",
# "operation_parameter": None,
"output_variable": "test_output_name",
"description": "test description",
}
Expand Down
54 changes: 54 additions & 0 deletions tests/data/parsers/test_parser_time_aggregation_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,57 @@ def test_parse_operation_type():
expected_message = f"Operation is not of a predefined type. Should be in: \
{[o.name for o in TimeOperationType]}. Received: Minimum"
assert exception_raised.args[0] == expected_message


def test_parse_operation_percentile_has_parameter():
"""Test if operation percentile is parsed correctly"""
# Arrange
contents = dict(
{
"name": "testname",
"input_variable": "input",
"operation": "PERCENTILE",
"output_variable": "output",
"time_scale": "year",
}
)
logger = Mock(ILogger)

# Act
data = ParserTimeAggregationRule()
with pytest.raises(ValueError) as exc_info:
data.parse_dict(contents, logger)

exception_raised = exc_info.value

# Assert
expected_message = (
"Operation percentile is missing valid value like 'percentile(10)'"
)
assert exception_raised.args[0] == expected_message


def test_parse_operation_percentile_valid_parameter():
"""Test if operation percentile is parsed correctly"""
# Arrange
contents = dict(
{
"name": "testname",
"input_variable": "input",
"operation": "PERCENTILE(999)",
"output_variable": "output",
"time_scale": "year",
}
)
logger = Mock(ILogger)

# Act
data = ParserTimeAggregationRule()
with pytest.raises(ValueError) as exc_info:
data.parse_dict(contents, logger)

exception_raised = exc_info.value

# Assert
expected_message = "Operation percentile should be a number between 0 and 100."
assert exception_raised.args[0] == expected_message
Loading

0 comments on commit 552be7f

Please sign in to comment.