Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding _check_inputs() for error calculation functions #279

Merged
merged 3 commits into from
Dec 21, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
228 changes: 151 additions & 77 deletions pypots/utils/metrics/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,57 @@
from ..logging import logger


def calc_mae(
def _check_inputs(
predictions: Union[np.ndarray, torch.Tensor, list],
targets: Union[np.ndarray, torch.Tensor, list],
masks: Optional[Union[np.ndarray, torch.Tensor, list]] = None,
check_shape: bool = True,
):
# check type
assert isinstance(predictions, type(targets)), (
f"types of `predictions` and `targets` must match, but got"
f"`predictions`: {type(predictions)}, `target`: {type(targets)}"
)
lib = np if isinstance(predictions, np.ndarray) else torch
# check shape
prediction_shape = predictions.shape
target_shape = targets.shape
if check_shape:
assert (
prediction_shape == target_shape
), f"shape of `predictions` and `targets` must match, but got {prediction_shape} and {target_shape}"
# check NaN
assert not lib.isnan(
predictions
).any(), "`predictions` mustn't contain NaN values, but detected NaN in it"
assert not lib.isnan(
targets
).any(), "`targets` mustn't contain NaN values, but detected NaN in it"

if masks is not None:
# check type
assert isinstance(masks, type(targets)), (
f"types of `masks`, `predictions`, and `targets` must match, but got"
f"`masks`: {type(masks)}, `targets`: {type(targets)}"
)
# check shape, masks shape must match targets
mask_shape = masks.shape
assert mask_shape == target_shape, (
f"shape of `masks` must match `targets` shape, "
f"but got `mask`: {mask_shape} that is different from `targets`: {target_shape}"
)
# check NaN
assert not lib.isnan(
masks
).any(), "`masks` mustn't contain NaN values, but detected NaN in it"

return lib


def calc_mae(
predictions: Union[np.ndarray, torch.Tensor],
targets: Union[np.ndarray, torch.Tensor],
masks: Optional[Union[np.ndarray, torch.Tensor]] = None,
) -> Union[float, torch.Tensor]:
"""Calculate the Mean Absolute Error between ``predictions`` and ``targets``.
``masks`` can be used for filtering. For values==0 in ``masks``,
Expand Down Expand Up @@ -55,23 +102,10 @@ def calc_mae(
so the result is 1/2=0.5.

"""
assert isinstance(predictions, type(targets)), (
f"types of inputs and target must match, but got"
f"type(inputs)={type(predictions)}, type(target)={type(targets)}"
)
prediction_shape = predictions.shape
target_shape = targets.shape
assert (
prediction_shape == target_shape
), f"shape of predictions and targets must match, but got {prediction_shape} and {target_shape} "
# check shapes and values of inputs
lib = _check_inputs(predictions, targets, masks)

lib = np if isinstance(predictions, np.ndarray) else torch
if masks is not None:
mask_shape = masks.shape
assert (
mask_shape == target_shape
), f"shape of masks must match predictions' shape, but got {mask_shape} and {prediction_shape} "

return lib.sum(lib.abs(predictions - targets) * masks) / (
lib.sum(masks) + 1e-12
)
Expand All @@ -80,9 +114,9 @@ def calc_mae(


def calc_mse(
predictions: Union[np.ndarray, torch.Tensor, list],
targets: Union[np.ndarray, torch.Tensor, list],
masks: Optional[Union[np.ndarray, torch.Tensor, list]] = None,
predictions: Union[np.ndarray, torch.Tensor],
targets: Union[np.ndarray, torch.Tensor],
masks: Optional[Union[np.ndarray, torch.Tensor]] = None,
) -> Union[float, torch.Tensor]:
"""Calculate the Mean Square Error between ``predictions`` and ``targets``.
``masks`` can be used for filtering. For values==0 in ``masks``,
Expand Down Expand Up @@ -121,23 +155,10 @@ def calc_mse(
so the result is 1/2=0.5.

"""
# check shapes and values of inputs
lib = _check_inputs(predictions, targets, masks)

assert isinstance(predictions, type(targets)), (
f"types of inputs and target must match, but got"
f"type(inputs)={type(predictions)}, type(target)={type(targets)}"
)
prediction_shape = predictions.shape
target_shape = targets.shape
assert (
prediction_shape == target_shape
), f"shape of predictions and targets must match, but got {prediction_shape} and {target_shape} "

lib = np if isinstance(predictions, np.ndarray) else torch
if masks is not None:
mask_shape = masks.shape
assert (
mask_shape == target_shape
), f"shape of masks must match predictions' shape, but got {mask_shape} and {prediction_shape} "
return lib.sum(lib.square(predictions - targets) * masks) / (
lib.sum(masks) + 1e-12
)
Expand All @@ -146,9 +167,9 @@ def calc_mse(


def calc_rmse(
predictions: Union[np.ndarray, torch.Tensor, list],
targets: Union[np.ndarray, torch.Tensor, list],
masks: Optional[Union[np.ndarray, torch.Tensor, list]] = None,
predictions: Union[np.ndarray, torch.Tensor],
targets: Union[np.ndarray, torch.Tensor],
masks: Optional[Union[np.ndarray, torch.Tensor]] = None,
) -> Union[float, torch.Tensor]:
"""Calculate the Root Mean Square Error between ``predictions`` and ``targets``.
``masks`` can be used for filtering. For values==0 in ``masks``,
Expand Down Expand Up @@ -188,18 +209,15 @@ def calc_rmse(
so the result is :math:`\\sqrt{1/2}=0.5`.

"""
assert isinstance(predictions, type(targets)), (
f"types of inputs and target must match, but got"
f"type(inputs)={type(predictions)}, type(target)={type(targets)}"
)
# don't have to check types and NaN here, since calc_mse() will do it
lib = np if isinstance(predictions, np.ndarray) else torch
return lib.sqrt(calc_mse(predictions, targets, masks))


def calc_mre(
predictions: Union[np.ndarray, torch.Tensor, list],
targets: Union[np.ndarray, torch.Tensor, list],
masks: Optional[Union[np.ndarray, torch.Tensor, list]] = None,
predictions: Union[np.ndarray, torch.Tensor],
targets: Union[np.ndarray, torch.Tensor],
masks: Optional[Union[np.ndarray, torch.Tensor]] = None,
) -> Union[float, torch.Tensor]:
"""Calculate the Mean Relative Error between ``predictions`` and ``targets``.
``masks`` can be used for filtering. For values==0 in ``masks``,
Expand Down Expand Up @@ -239,22 +257,10 @@ def calc_mre(
so the result is :math:`\\sqrt{1/2}=0.5`.

"""
assert isinstance(predictions, type(targets)), (
f"types of inputs and target must match, but got"
f"type(inputs)={type(predictions)}, type(target)={type(targets)}"
)
prediction_shape = predictions.shape
target_shape = targets.shape
assert (
prediction_shape == target_shape
), f"shape of predictions and targets must match, but got {prediction_shape} and {target_shape} "
# check shapes and values of inputs
lib = _check_inputs(predictions, targets, masks)

lib = np if isinstance(predictions, np.ndarray) else torch
if masks is not None:
mask_shape = masks.shape
assert (
mask_shape == target_shape
), f"shape of masks must match predictions' shape, but got {mask_shape} and {prediction_shape} "
return lib.sum(lib.abs(predictions - targets) * masks) / (
lib.sum(lib.abs(targets * masks)) + 1e-12
)
Expand All @@ -273,51 +279,119 @@ def calc_quantile_loss(predictions, targets, q: float, eval_points) -> float:
return quantile_loss


def calc_quantile_crps(predictions, targets, eval_points, mean_scaler=0, scaler=1):
"""Continuous rank probability score for distributional predictions."""
def calc_quantile_crps(
predictions: Union[np.ndarray, torch.Tensor],
targets: Union[np.ndarray, torch.Tensor],
masks: Union[np.ndarray, torch.Tensor],
scaler_mean=0,
scaler_stddev=1,
) -> float:
"""Continuous rank probability score for distributional predictions.

Parameters
----------
predictions :
The prediction data to be evaluated.

targets :
The target data for helping evaluate the predictions.

masks :
The masks for filtering the specific values in inputs and target from evaluation.
Only values at corresponding positions where values ==1 in ``masks`` will be used for evaluation.

scaler_mean:
Mean value of the scaler used to scale the data.

scaler_stddev:
Standard deviation value of the scaler used to scale the data.

Returns
-------
CRPS :
Value of continuous rank probability score.

"""
# check shapes and values of inputs
_ = _check_inputs(predictions, targets, masks, check_shape=False)

if isinstance(predictions, np.ndarray):
predictions = torch.from_numpy(predictions)
if isinstance(targets, np.ndarray):
targets = torch.from_numpy(targets)
if isinstance(eval_points, np.ndarray):
eval_points = torch.from_numpy(eval_points)
if isinstance(masks, np.ndarray):
masks = torch.from_numpy(masks)

targets = targets * scaler + mean_scaler
predictions = predictions * scaler + mean_scaler
targets = targets * scaler_stddev + scaler_mean
predictions = predictions * scaler_stddev + scaler_mean

quantiles = np.arange(0.05, 1.0, 0.05)
denominator = torch.sum(torch.abs(targets * eval_points))
CRPS = 0
denominator = torch.sum(torch.abs(targets * masks))
CRPS = torch.tensor(0.0)
for i in range(len(quantiles)):
q_pred = []
for j in range(len(predictions)):
q_pred.append(torch.quantile(predictions[j : j + 1], quantiles[i], dim=1))
q_pred = torch.cat(q_pred, 0)
q_loss = calc_quantile_loss(targets, q_pred, quantiles[i], eval_points)
q_loss = calc_quantile_loss(targets, q_pred, quantiles[i], masks)
CRPS += q_loss / denominator
return CRPS.item() / len(quantiles)


def calc_quantile_crps_sum(predictions, targets, eval_points, mean_scaler=0, scaler=1):
"""Continuous rank probability score for distributional predictions."""
def calc_quantile_crps_sum(
predictions: Union[np.ndarray, torch.Tensor],
targets: Union[np.ndarray, torch.Tensor],
masks: Union[np.ndarray, torch.Tensor],
scaler_mean=0,
scaler_stddev=1,
) -> float:
"""Sum continuous rank probability score for distributional predictions.

Parameters
----------
predictions :
The prediction data to be evaluated.

targets :
The target data for helping evaluate the predictions.

masks :
The masks for filtering the specific values in inputs and target from evaluation.
Only values at corresponding positions where values ==1 in ``masks`` will be used for evaluation.

scaler_mean:
Mean value of the scaler used to scale the data.

scaler_stddev:
Standard deviation value of the scaler used to scale the data.

Returns
-------
CRPS :
Sum value of continuous rank probability score.

"""
# check shapes and values of inputs
_ = _check_inputs(predictions, targets, masks, check_shape=False)

if isinstance(predictions, np.ndarray):
predictions = torch.from_numpy(predictions)
if isinstance(targets, np.ndarray):
targets = torch.from_numpy(targets)
if isinstance(eval_points, np.ndarray):
eval_points = torch.from_numpy(eval_points)
if isinstance(masks, np.ndarray):
masks = torch.from_numpy(masks)

eval_points = eval_points.mean(-1)
targets = targets * scaler + mean_scaler
masks = masks.mean(-1)
targets = targets * scaler_stddev + scaler_mean
targets = targets.sum(-1)
predictions = predictions * scaler + mean_scaler
predictions = predictions * scaler_stddev + scaler_mean

quantiles = np.arange(0.05, 1.0, 0.05)
denominator = torch.sum(torch.abs(targets * eval_points))
CRPS = 0
denominator = torch.sum(torch.abs(targets * masks))
CRPS = torch.tensor(0.0)
for i in range(len(quantiles)):
q_pred = torch.quantile(predictions.sum(-1), quantiles[i], dim=1)
q_loss = calc_quantile_loss(targets, q_pred, quantiles[i], eval_points)
q_loss = calc_quantile_loss(targets, q_pred, quantiles[i], masks)
CRPS += q_loss / denominator
return CRPS.item() / len(quantiles)

Expand Down
Loading