Skip to content

Commit

Permalink
Adding _check_inputs() for error calculation functions (#279)
Browse files Browse the repository at this point in the history
* feat: add _check_inputs() for all error metric calculation functions;

* fix: use corresponding lib to detect NaN, otherwise loss calculation during torch training will raise error...

RuntimeError: Can't call numpy() on Tensor that requires grad. Use tensor.detach().numpy() instead.

* fix: don't check shapes for CRPS calc funcs;
  • Loading branch information
WenjieDu authored Dec 21, 2023
1 parent 5dc5ba3 commit 9c5c380
Showing 1 changed file with 151 additions and 77 deletions.
228 changes: 151 additions & 77 deletions pypots/utils/metrics/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,57 @@
from ..logging import logger


def calc_mae(
def _check_inputs(
predictions: Union[np.ndarray, torch.Tensor, list],
targets: Union[np.ndarray, torch.Tensor, list],
masks: Optional[Union[np.ndarray, torch.Tensor, list]] = None,
check_shape: bool = True,
):
# check type
assert isinstance(predictions, type(targets)), (
f"types of `predictions` and `targets` must match, but got"
f"`predictions`: {type(predictions)}, `target`: {type(targets)}"
)
lib = np if isinstance(predictions, np.ndarray) else torch
# check shape
prediction_shape = predictions.shape
target_shape = targets.shape
if check_shape:
assert (
prediction_shape == target_shape
), f"shape of `predictions` and `targets` must match, but got {prediction_shape} and {target_shape}"
# check NaN
assert not lib.isnan(
predictions
).any(), "`predictions` mustn't contain NaN values, but detected NaN in it"
assert not lib.isnan(
targets
).any(), "`targets` mustn't contain NaN values, but detected NaN in it"

if masks is not None:
# check type
assert isinstance(masks, type(targets)), (
f"types of `masks`, `predictions`, and `targets` must match, but got"
f"`masks`: {type(masks)}, `targets`: {type(targets)}"
)
# check shape, masks shape must match targets
mask_shape = masks.shape
assert mask_shape == target_shape, (
f"shape of `masks` must match `targets` shape, "
f"but got `mask`: {mask_shape} that is different from `targets`: {target_shape}"
)
# check NaN
assert not lib.isnan(
masks
).any(), "`masks` mustn't contain NaN values, but detected NaN in it"

return lib


def calc_mae(
predictions: Union[np.ndarray, torch.Tensor],
targets: Union[np.ndarray, torch.Tensor],
masks: Optional[Union[np.ndarray, torch.Tensor]] = None,
) -> Union[float, torch.Tensor]:
"""Calculate the Mean Absolute Error between ``predictions`` and ``targets``.
``masks`` can be used for filtering. For values==0 in ``masks``,
Expand Down Expand Up @@ -55,23 +102,10 @@ def calc_mae(
so the result is 1/2=0.5.
"""
assert isinstance(predictions, type(targets)), (
f"types of inputs and target must match, but got"
f"type(inputs)={type(predictions)}, type(target)={type(targets)}"
)
prediction_shape = predictions.shape
target_shape = targets.shape
assert (
prediction_shape == target_shape
), f"shape of predictions and targets must match, but got {prediction_shape} and {target_shape} "
# check shapes and values of inputs
lib = _check_inputs(predictions, targets, masks)

lib = np if isinstance(predictions, np.ndarray) else torch
if masks is not None:
mask_shape = masks.shape
assert (
mask_shape == target_shape
), f"shape of masks must match predictions' shape, but got {mask_shape} and {prediction_shape} "

return lib.sum(lib.abs(predictions - targets) * masks) / (
lib.sum(masks) + 1e-12
)
Expand All @@ -80,9 +114,9 @@ def calc_mae(


def calc_mse(
predictions: Union[np.ndarray, torch.Tensor, list],
targets: Union[np.ndarray, torch.Tensor, list],
masks: Optional[Union[np.ndarray, torch.Tensor, list]] = None,
predictions: Union[np.ndarray, torch.Tensor],
targets: Union[np.ndarray, torch.Tensor],
masks: Optional[Union[np.ndarray, torch.Tensor]] = None,
) -> Union[float, torch.Tensor]:
"""Calculate the Mean Square Error between ``predictions`` and ``targets``.
``masks`` can be used for filtering. For values==0 in ``masks``,
Expand Down Expand Up @@ -121,23 +155,10 @@ def calc_mse(
so the result is 1/2=0.5.
"""
# check shapes and values of inputs
lib = _check_inputs(predictions, targets, masks)

assert isinstance(predictions, type(targets)), (
f"types of inputs and target must match, but got"
f"type(inputs)={type(predictions)}, type(target)={type(targets)}"
)
prediction_shape = predictions.shape
target_shape = targets.shape
assert (
prediction_shape == target_shape
), f"shape of predictions and targets must match, but got {prediction_shape} and {target_shape} "

lib = np if isinstance(predictions, np.ndarray) else torch
if masks is not None:
mask_shape = masks.shape
assert (
mask_shape == target_shape
), f"shape of masks must match predictions' shape, but got {mask_shape} and {prediction_shape} "
return lib.sum(lib.square(predictions - targets) * masks) / (
lib.sum(masks) + 1e-12
)
Expand All @@ -146,9 +167,9 @@ def calc_mse(


def calc_rmse(
predictions: Union[np.ndarray, torch.Tensor, list],
targets: Union[np.ndarray, torch.Tensor, list],
masks: Optional[Union[np.ndarray, torch.Tensor, list]] = None,
predictions: Union[np.ndarray, torch.Tensor],
targets: Union[np.ndarray, torch.Tensor],
masks: Optional[Union[np.ndarray, torch.Tensor]] = None,
) -> Union[float, torch.Tensor]:
"""Calculate the Root Mean Square Error between ``predictions`` and ``targets``.
``masks`` can be used for filtering. For values==0 in ``masks``,
Expand Down Expand Up @@ -188,18 +209,15 @@ def calc_rmse(
so the result is :math:`\\sqrt{1/2}=0.5`.
"""
assert isinstance(predictions, type(targets)), (
f"types of inputs and target must match, but got"
f"type(inputs)={type(predictions)}, type(target)={type(targets)}"
)
# don't have to check types and NaN here, since calc_mse() will do it
lib = np if isinstance(predictions, np.ndarray) else torch
return lib.sqrt(calc_mse(predictions, targets, masks))


def calc_mre(
predictions: Union[np.ndarray, torch.Tensor, list],
targets: Union[np.ndarray, torch.Tensor, list],
masks: Optional[Union[np.ndarray, torch.Tensor, list]] = None,
predictions: Union[np.ndarray, torch.Tensor],
targets: Union[np.ndarray, torch.Tensor],
masks: Optional[Union[np.ndarray, torch.Tensor]] = None,
) -> Union[float, torch.Tensor]:
"""Calculate the Mean Relative Error between ``predictions`` and ``targets``.
``masks`` can be used for filtering. For values==0 in ``masks``,
Expand Down Expand Up @@ -239,22 +257,10 @@ def calc_mre(
so the result is :math:`\\sqrt{1/2}=0.5`.
"""
assert isinstance(predictions, type(targets)), (
f"types of inputs and target must match, but got"
f"type(inputs)={type(predictions)}, type(target)={type(targets)}"
)
prediction_shape = predictions.shape
target_shape = targets.shape
assert (
prediction_shape == target_shape
), f"shape of predictions and targets must match, but got {prediction_shape} and {target_shape} "
# check shapes and values of inputs
lib = _check_inputs(predictions, targets, masks)

lib = np if isinstance(predictions, np.ndarray) else torch
if masks is not None:
mask_shape = masks.shape
assert (
mask_shape == target_shape
), f"shape of masks must match predictions' shape, but got {mask_shape} and {prediction_shape} "
return lib.sum(lib.abs(predictions - targets) * masks) / (
lib.sum(lib.abs(targets * masks)) + 1e-12
)
Expand All @@ -273,51 +279,119 @@ def calc_quantile_loss(predictions, targets, q: float, eval_points) -> float:
return quantile_loss


def calc_quantile_crps(predictions, targets, eval_points, mean_scaler=0, scaler=1):
"""Continuous rank probability score for distributional predictions."""
def calc_quantile_crps(
predictions: Union[np.ndarray, torch.Tensor],
targets: Union[np.ndarray, torch.Tensor],
masks: Union[np.ndarray, torch.Tensor],
scaler_mean=0,
scaler_stddev=1,
) -> float:
"""Continuous rank probability score for distributional predictions.
Parameters
----------
predictions :
The prediction data to be evaluated.
targets :
The target data for helping evaluate the predictions.
masks :
The masks for filtering the specific values in inputs and target from evaluation.
Only values at corresponding positions where values ==1 in ``masks`` will be used for evaluation.
scaler_mean:
Mean value of the scaler used to scale the data.
scaler_stddev:
Standard deviation value of the scaler used to scale the data.
Returns
-------
CRPS :
Value of continuous rank probability score.
"""
# check shapes and values of inputs
_ = _check_inputs(predictions, targets, masks, check_shape=False)

if isinstance(predictions, np.ndarray):
predictions = torch.from_numpy(predictions)
if isinstance(targets, np.ndarray):
targets = torch.from_numpy(targets)
if isinstance(eval_points, np.ndarray):
eval_points = torch.from_numpy(eval_points)
if isinstance(masks, np.ndarray):
masks = torch.from_numpy(masks)

targets = targets * scaler + mean_scaler
predictions = predictions * scaler + mean_scaler
targets = targets * scaler_stddev + scaler_mean
predictions = predictions * scaler_stddev + scaler_mean

quantiles = np.arange(0.05, 1.0, 0.05)
denominator = torch.sum(torch.abs(targets * eval_points))
CRPS = 0
denominator = torch.sum(torch.abs(targets * masks))
CRPS = torch.tensor(0.0)
for i in range(len(quantiles)):
q_pred = []
for j in range(len(predictions)):
q_pred.append(torch.quantile(predictions[j : j + 1], quantiles[i], dim=1))
q_pred = torch.cat(q_pred, 0)
q_loss = calc_quantile_loss(targets, q_pred, quantiles[i], eval_points)
q_loss = calc_quantile_loss(targets, q_pred, quantiles[i], masks)
CRPS += q_loss / denominator
return CRPS.item() / len(quantiles)


def calc_quantile_crps_sum(predictions, targets, eval_points, mean_scaler=0, scaler=1):
"""Continuous rank probability score for distributional predictions."""
def calc_quantile_crps_sum(
predictions: Union[np.ndarray, torch.Tensor],
targets: Union[np.ndarray, torch.Tensor],
masks: Union[np.ndarray, torch.Tensor],
scaler_mean=0,
scaler_stddev=1,
) -> float:
"""Sum continuous rank probability score for distributional predictions.
Parameters
----------
predictions :
The prediction data to be evaluated.
targets :
The target data for helping evaluate the predictions.
masks :
The masks for filtering the specific values in inputs and target from evaluation.
Only values at corresponding positions where values ==1 in ``masks`` will be used for evaluation.
scaler_mean:
Mean value of the scaler used to scale the data.
scaler_stddev:
Standard deviation value of the scaler used to scale the data.
Returns
-------
CRPS :
Sum value of continuous rank probability score.
"""
# check shapes and values of inputs
_ = _check_inputs(predictions, targets, masks, check_shape=False)

if isinstance(predictions, np.ndarray):
predictions = torch.from_numpy(predictions)
if isinstance(targets, np.ndarray):
targets = torch.from_numpy(targets)
if isinstance(eval_points, np.ndarray):
eval_points = torch.from_numpy(eval_points)
if isinstance(masks, np.ndarray):
masks = torch.from_numpy(masks)

eval_points = eval_points.mean(-1)
targets = targets * scaler + mean_scaler
masks = masks.mean(-1)
targets = targets * scaler_stddev + scaler_mean
targets = targets.sum(-1)
predictions = predictions * scaler + mean_scaler
predictions = predictions * scaler_stddev + scaler_mean

quantiles = np.arange(0.05, 1.0, 0.05)
denominator = torch.sum(torch.abs(targets * eval_points))
CRPS = 0
denominator = torch.sum(torch.abs(targets * masks))
CRPS = torch.tensor(0.0)
for i in range(len(quantiles)):
q_pred = torch.quantile(predictions.sum(-1), quantiles[i], dim=1)
q_loss = calc_quantile_loss(targets, q_pred, quantiles[i], eval_points)
q_loss = calc_quantile_loss(targets, q_pred, quantiles[i], masks)
CRPS += q_loss / denominator
return CRPS.item() / len(quantiles)

Expand Down

0 comments on commit 9c5c380

Please sign in to comment.