diff --git a/pypots/utils/metrics/error.py b/pypots/utils/metrics/error.py index 9136e232..16d7c923 100644 --- a/pypots/utils/metrics/error.py +++ b/pypots/utils/metrics/error.py @@ -13,10 +13,57 @@ from ..logging import logger -def calc_mae( +def _check_inputs( predictions: Union[np.ndarray, torch.Tensor, list], targets: Union[np.ndarray, torch.Tensor, list], masks: Optional[Union[np.ndarray, torch.Tensor, list]] = None, + check_shape: bool = True, +): + # check type + assert isinstance(predictions, type(targets)), ( + f"types of `predictions` and `targets` must match, but got" + f"`predictions`: {type(predictions)}, `target`: {type(targets)}" + ) + lib = np if isinstance(predictions, np.ndarray) else torch + # check shape + prediction_shape = predictions.shape + target_shape = targets.shape + if check_shape: + assert ( + prediction_shape == target_shape + ), f"shape of `predictions` and `targets` must match, but got {prediction_shape} and {target_shape}" + # check NaN + assert not lib.isnan( + predictions + ).any(), "`predictions` mustn't contain NaN values, but detected NaN in it" + assert not lib.isnan( + targets + ).any(), "`targets` mustn't contain NaN values, but detected NaN in it" + + if masks is not None: + # check type + assert isinstance(masks, type(targets)), ( + f"types of `masks`, `predictions`, and `targets` must match, but got" + f"`masks`: {type(masks)}, `targets`: {type(targets)}" + ) + # check shape, masks shape must match targets + mask_shape = masks.shape + assert mask_shape == target_shape, ( + f"shape of `masks` must match `targets` shape, " + f"but got `mask`: {mask_shape} that is different from `targets`: {target_shape}" + ) + # check NaN + assert not lib.isnan( + masks + ).any(), "`masks` mustn't contain NaN values, but detected NaN in it" + + return lib + + +def calc_mae( + predictions: Union[np.ndarray, torch.Tensor], + targets: Union[np.ndarray, torch.Tensor], + masks: Optional[Union[np.ndarray, torch.Tensor]] = None, ) -> Union[float, torch.Tensor]: """Calculate the Mean Absolute Error between ``predictions`` and ``targets``. ``masks`` can be used for filtering. For values==0 in ``masks``, @@ -55,23 +102,10 @@ def calc_mae( so the result is 1/2=0.5. """ - assert isinstance(predictions, type(targets)), ( - f"types of inputs and target must match, but got" - f"type(inputs)={type(predictions)}, type(target)={type(targets)}" - ) - prediction_shape = predictions.shape - target_shape = targets.shape - assert ( - prediction_shape == target_shape - ), f"shape of predictions and targets must match, but got {prediction_shape} and {target_shape} " + # check shapes and values of inputs + lib = _check_inputs(predictions, targets, masks) - lib = np if isinstance(predictions, np.ndarray) else torch if masks is not None: - mask_shape = masks.shape - assert ( - mask_shape == target_shape - ), f"shape of masks must match predictions' shape, but got {mask_shape} and {prediction_shape} " - return lib.sum(lib.abs(predictions - targets) * masks) / ( lib.sum(masks) + 1e-12 ) @@ -80,9 +114,9 @@ def calc_mae( def calc_mse( - predictions: Union[np.ndarray, torch.Tensor, list], - targets: Union[np.ndarray, torch.Tensor, list], - masks: Optional[Union[np.ndarray, torch.Tensor, list]] = None, + predictions: Union[np.ndarray, torch.Tensor], + targets: Union[np.ndarray, torch.Tensor], + masks: Optional[Union[np.ndarray, torch.Tensor]] = None, ) -> Union[float, torch.Tensor]: """Calculate the Mean Square Error between ``predictions`` and ``targets``. ``masks`` can be used for filtering. For values==0 in ``masks``, @@ -121,23 +155,10 @@ def calc_mse( so the result is 1/2=0.5. """ + # check shapes and values of inputs + lib = _check_inputs(predictions, targets, masks) - assert isinstance(predictions, type(targets)), ( - f"types of inputs and target must match, but got" - f"type(inputs)={type(predictions)}, type(target)={type(targets)}" - ) - prediction_shape = predictions.shape - target_shape = targets.shape - assert ( - prediction_shape == target_shape - ), f"shape of predictions and targets must match, but got {prediction_shape} and {target_shape} " - - lib = np if isinstance(predictions, np.ndarray) else torch if masks is not None: - mask_shape = masks.shape - assert ( - mask_shape == target_shape - ), f"shape of masks must match predictions' shape, but got {mask_shape} and {prediction_shape} " return lib.sum(lib.square(predictions - targets) * masks) / ( lib.sum(masks) + 1e-12 ) @@ -146,9 +167,9 @@ def calc_mse( def calc_rmse( - predictions: Union[np.ndarray, torch.Tensor, list], - targets: Union[np.ndarray, torch.Tensor, list], - masks: Optional[Union[np.ndarray, torch.Tensor, list]] = None, + predictions: Union[np.ndarray, torch.Tensor], + targets: Union[np.ndarray, torch.Tensor], + masks: Optional[Union[np.ndarray, torch.Tensor]] = None, ) -> Union[float, torch.Tensor]: """Calculate the Root Mean Square Error between ``predictions`` and ``targets``. ``masks`` can be used for filtering. For values==0 in ``masks``, @@ -188,18 +209,15 @@ def calc_rmse( so the result is :math:`\\sqrt{1/2}=0.5`. """ - assert isinstance(predictions, type(targets)), ( - f"types of inputs and target must match, but got" - f"type(inputs)={type(predictions)}, type(target)={type(targets)}" - ) + # don't have to check types and NaN here, since calc_mse() will do it lib = np if isinstance(predictions, np.ndarray) else torch return lib.sqrt(calc_mse(predictions, targets, masks)) def calc_mre( - predictions: Union[np.ndarray, torch.Tensor, list], - targets: Union[np.ndarray, torch.Tensor, list], - masks: Optional[Union[np.ndarray, torch.Tensor, list]] = None, + predictions: Union[np.ndarray, torch.Tensor], + targets: Union[np.ndarray, torch.Tensor], + masks: Optional[Union[np.ndarray, torch.Tensor]] = None, ) -> Union[float, torch.Tensor]: """Calculate the Mean Relative Error between ``predictions`` and ``targets``. ``masks`` can be used for filtering. For values==0 in ``masks``, @@ -239,22 +257,10 @@ def calc_mre( so the result is :math:`\\sqrt{1/2}=0.5`. """ - assert isinstance(predictions, type(targets)), ( - f"types of inputs and target must match, but got" - f"type(inputs)={type(predictions)}, type(target)={type(targets)}" - ) - prediction_shape = predictions.shape - target_shape = targets.shape - assert ( - prediction_shape == target_shape - ), f"shape of predictions and targets must match, but got {prediction_shape} and {target_shape} " + # check shapes and values of inputs + lib = _check_inputs(predictions, targets, masks) - lib = np if isinstance(predictions, np.ndarray) else torch if masks is not None: - mask_shape = masks.shape - assert ( - mask_shape == target_shape - ), f"shape of masks must match predictions' shape, but got {mask_shape} and {prediction_shape} " return lib.sum(lib.abs(predictions - targets) * masks) / ( lib.sum(lib.abs(targets * masks)) + 1e-12 ) @@ -273,51 +279,119 @@ def calc_quantile_loss(predictions, targets, q: float, eval_points) -> float: return quantile_loss -def calc_quantile_crps(predictions, targets, eval_points, mean_scaler=0, scaler=1): - """Continuous rank probability score for distributional predictions.""" +def calc_quantile_crps( + predictions: Union[np.ndarray, torch.Tensor], + targets: Union[np.ndarray, torch.Tensor], + masks: Union[np.ndarray, torch.Tensor], + scaler_mean=0, + scaler_stddev=1, +) -> float: + """Continuous rank probability score for distributional predictions. + + Parameters + ---------- + predictions : + The prediction data to be evaluated. + + targets : + The target data for helping evaluate the predictions. + + masks : + The masks for filtering the specific values in inputs and target from evaluation. + Only values at corresponding positions where values ==1 in ``masks`` will be used for evaluation. + + scaler_mean: + Mean value of the scaler used to scale the data. + + scaler_stddev: + Standard deviation value of the scaler used to scale the data. + + Returns + ------- + CRPS : + Value of continuous rank probability score. + + """ + # check shapes and values of inputs + _ = _check_inputs(predictions, targets, masks, check_shape=False) + if isinstance(predictions, np.ndarray): predictions = torch.from_numpy(predictions) if isinstance(targets, np.ndarray): targets = torch.from_numpy(targets) - if isinstance(eval_points, np.ndarray): - eval_points = torch.from_numpy(eval_points) + if isinstance(masks, np.ndarray): + masks = torch.from_numpy(masks) - targets = targets * scaler + mean_scaler - predictions = predictions * scaler + mean_scaler + targets = targets * scaler_stddev + scaler_mean + predictions = predictions * scaler_stddev + scaler_mean quantiles = np.arange(0.05, 1.0, 0.05) - denominator = torch.sum(torch.abs(targets * eval_points)) - CRPS = 0 + denominator = torch.sum(torch.abs(targets * masks)) + CRPS = torch.tensor(0.0) for i in range(len(quantiles)): q_pred = [] for j in range(len(predictions)): q_pred.append(torch.quantile(predictions[j : j + 1], quantiles[i], dim=1)) q_pred = torch.cat(q_pred, 0) - q_loss = calc_quantile_loss(targets, q_pred, quantiles[i], eval_points) + q_loss = calc_quantile_loss(targets, q_pred, quantiles[i], masks) CRPS += q_loss / denominator return CRPS.item() / len(quantiles) -def calc_quantile_crps_sum(predictions, targets, eval_points, mean_scaler=0, scaler=1): - """Continuous rank probability score for distributional predictions.""" +def calc_quantile_crps_sum( + predictions: Union[np.ndarray, torch.Tensor], + targets: Union[np.ndarray, torch.Tensor], + masks: Union[np.ndarray, torch.Tensor], + scaler_mean=0, + scaler_stddev=1, +) -> float: + """Sum continuous rank probability score for distributional predictions. + + Parameters + ---------- + predictions : + The prediction data to be evaluated. + + targets : + The target data for helping evaluate the predictions. + + masks : + The masks for filtering the specific values in inputs and target from evaluation. + Only values at corresponding positions where values ==1 in ``masks`` will be used for evaluation. + + scaler_mean: + Mean value of the scaler used to scale the data. + + scaler_stddev: + Standard deviation value of the scaler used to scale the data. + + Returns + ------- + CRPS : + Sum value of continuous rank probability score. + + """ + # check shapes and values of inputs + _ = _check_inputs(predictions, targets, masks, check_shape=False) + if isinstance(predictions, np.ndarray): predictions = torch.from_numpy(predictions) if isinstance(targets, np.ndarray): targets = torch.from_numpy(targets) - if isinstance(eval_points, np.ndarray): - eval_points = torch.from_numpy(eval_points) + if isinstance(masks, np.ndarray): + masks = torch.from_numpy(masks) - eval_points = eval_points.mean(-1) - targets = targets * scaler + mean_scaler + masks = masks.mean(-1) + targets = targets * scaler_stddev + scaler_mean targets = targets.sum(-1) - predictions = predictions * scaler + mean_scaler + predictions = predictions * scaler_stddev + scaler_mean quantiles = np.arange(0.05, 1.0, 0.05) - denominator = torch.sum(torch.abs(targets * eval_points)) - CRPS = 0 + denominator = torch.sum(torch.abs(targets * masks)) + CRPS = torch.tensor(0.0) for i in range(len(quantiles)): q_pred = torch.quantile(predictions.sum(-1), quantiles[i], dim=1) - q_loss = calc_quantile_loss(targets, q_pred, quantiles[i], eval_points) + q_loss = calc_quantile_loss(targets, q_pred, quantiles[i], masks) CRPS += q_loss / denominator return CRPS.item() / len(quantiles)