Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: remove target transformation #13

Merged
merged 3 commits into from
Feb 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions src/neo_ls_svm/_affine_feature_map.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Affine feature map."""

from functools import cached_property
from typing import TypeVar, cast
from typing import Any, TypeVar, cast

import numpy as np
import numpy.typing as npt
Expand Down Expand Up @@ -86,7 +86,7 @@ def transform(self, X: FloatMatrix[F]) -> FloatMatrix[F]:
if A.shape[1] < A.shape[0]
else (X - shift) @ (A / scale.T)
)
)
).astype(X.dtype)
if self.append_features and A is not None:
X_transformed = np.hstack((X, X_transformed))
return X_transformed
Expand All @@ -108,7 +108,7 @@ def inverse_transform(self, X_transformed: FloatMatrix[F]) -> FloatMatrix[F]:
if A is not None:
pinvA = cast(FloatMatrix[F], self.pseudo_inverse)
X = X @ pinvA
X = (X * scale + shift).astype(shift.dtype)
X = (X * scale + shift).astype(X.dtype)
return X

def get_feature_names_out(
Expand All @@ -130,3 +130,7 @@ def get_feature_names_out(
if self.append_features and A is not None:
output_features = np.hstack((input_features_array, output_features))
return output_features

def _more_tags(self) -> dict[str, Any]:
# https://scikit-learn.org/stable/developers/develop.html#estimator-tags
return {"preserves_dtype": [np.float64, np.float32]}
46 changes: 16 additions & 30 deletions src/neo_ls_svm/_neo_ls_svm.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,13 @@ def __init__( # noqa: PLR0913
primal_feature_map: KernelApproximatingFeatureMap | None = None,
dual_feature_map: AffineSeparator | None = None,
dual: bool | None = None,
max_epochs: int = 1,
refit: bool = False,
random_state: int | np.random.RandomState | None = 42,
estimator_type: Literal["classifier", "regressor"] | None = None,
) -> None:
self.primal_feature_map = primal_feature_map
self.dual_feature_map = dual_feature_map
self.dual = dual
self.max_epochs = max_epochs
self.refit = refit
self.random_state = random_state
self.estimator_type = estimator_type
Expand Down Expand Up @@ -140,9 +138,7 @@ def _optimize_β̂_γ(
rγ = 1 / (self.γs_[np.newaxis, :] + λ[:, np.newaxis])
with np.errstate(divide="ignore", invalid="ignore"):
loo_residuals = (φβ̂ @ rγ - y[:, np.newaxis]) / (1 - h @ rγ)
loo_residuals = loo_residuals * self.y_scale_
y_true = y * self.y_scale_ + self.y_shift_
ŷ_loo = loo_residuals + y_true[:, np.newaxis]
ŷ_loo = y[:, np.newaxis] + loo_residuals
# In the case of binary classification, clip overly positive and overly negative
# predictions' residuals to 0 when the labels are positive and negative, respectively.
if self._estimator_type == "classifier":
Expand All @@ -163,14 +159,14 @@ def _optimize_β̂_γ(
self.loo_leverage_ = h @ rγ[:, optimum]
self.loo_error_ = self.loo_errors_γs_[optimum]
if self._estimator_type == "classifier":
self.loo_score_ = accuracy_score(y_true, np.sign(ŷ_loo[:, optimum]), sample_weight=s)
self.loo_score_ = accuracy_score(y, np.sign(ŷ_loo[:, optimum]), sample_weight=s)
elif self._estimator_type == "regressor":
self.loo_score_ = r2_score(y_true, ŷ_loo[:, optimum], sample_weight=s)
self.loo_score_ = r2_score(y, ŷ_loo[:, optimum], sample_weight=s)
β̂, γ = β̂ @ rγ[:, optimum], self.γs_[optimum]
# Resolve the linear system for better accuracy.
if self.refit:
β̂ = np.linalg.solve(γ * C + A, φSTSy)
self.residuals_ = (np.real(φ @ β̂) - y) * self.y_scale_
self.residuals_ = np.real(φ @ β̂) - y
if self._estimator_type == "classifier":
self.residuals_[(y > 0) & (self.residuals_ > 0)] = 0
self.residuals_[(y < 0) & (self.residuals_ < 0)] = 0
Expand Down Expand Up @@ -273,9 +269,7 @@ def _optimize_α̂_γ(
np.fill_diagonal(F_loo, 0)
α̂_loo = α̂ @ (1 / (self.γs_[np.newaxis, :] * ρ + λ[:, np.newaxis]))
ŷ_loo = np.sum(F_loo[:, np.newaxis, :] * H_loo, axis=2) * α̂_loo + F_loo @ α̂_loo
ŷ_loo = ŷ_loo * self.y_scale_ + self.y_shift_
y_true = y * self.y_scale_ + self.y_shift_
loo_residuals = ŷ_loo - y_true[:, np.newaxis]
loo_residuals = ŷ_loo - y[:, np.newaxis]
# In the case of binary classification, clip overly positive and overly negative
# predictions' residuals to 0 when the labels are positive and negative, respectively.
if self._estimator_type == "classifier":
Expand All @@ -295,21 +289,21 @@ def _optimize_α̂_γ(
self.loo_residuals_ = loo_residuals[:, optimum]
self.loo_error_ = self.loo_errors_γs_[optimum]
if self._estimator_type == "classifier":
self.loo_score_ = accuracy_score(y_true, np.sign(ŷ_loo[:, optimum]), sample_weight=s)
self.loo_score_ = accuracy_score(y, np.sign(ŷ_loo[:, optimum]), sample_weight=s)
elif self._estimator_type == "regressor":
self.loo_score_ = r2_score(y_true, ŷ_loo[:, optimum], sample_weight=s)
self.loo_score_ = r2_score(y, ŷ_loo[:, optimum], sample_weight=s)
α̂, γ = α̂_loo[:, optimum], self.γs_[optimum]
# Resolve the linear system for better accuracy.
if self.refit:
α̂ = np.linalg.solve(γ * ρ * np.diag(sn**-2) + K, y)
self.residuals_ = (F @ α̂ - y) * self.y_scale_
self.residuals_ = F @ α̂ - y
if self._estimator_type == "classifier":
self.residuals_[(y > 0) & (self.residuals_ > 0)] = 0
self.residuals_[(y < 0) & (self.residuals_ < 0)] = 0
# TODO: Print warning if optimal γ is found at the edge.
return α̂, γ

def fit( # noqa: PLR0915
def fit(
self, X: FloatMatrix[F], y: GenericVector, sample_weight: FloatVector[F] | None = None
) -> "NeoLSSVM":
"""Fit this predictor."""
Expand Down Expand Up @@ -347,19 +341,10 @@ def fit( # noqa: PLR0915
y_ = np.ones(y.shape, dtype=X.dtype)
y_[negatives] = -1
elif self._estimator_type == "regressor":
y_ = cast(npt.NDArray[np.floating[Any]], y)
y_ = y.astype(X.dtype)
else:
message = "Target type not supported"
raise ValueError(message)
# Fit robust shift and scale parameters for the target y.
if self._estimator_type == "classifier":
self.y_shift_: float = 0.0
self.y_scale_: float = 1.0
elif self._estimator_type == "regressor":
l, self.y_shift_, u = np.quantile(y_, [0.05, 0.5, 0.95]) # noqa: E741
self.y_scale_ = np.maximum(np.abs(l - self.y_shift_), np.abs(u - self.y_shift_))
self.y_scale_ = 1.0 if self.y_scale_ <= np.finfo(X.dtype).eps else self.y_scale_
y_ = ((y_ - self.y_shift_) / self.y_scale_).astype(X.dtype)
# Determine whether we want to solve this in the primal or dual space.
self.dual_ = X.shape[0] <= 1024 if self.dual is None else self.dual # noqa: PLR2004
self.primal_ = not self.dual_
Expand Down Expand Up @@ -390,7 +375,7 @@ def fit( # noqa: PLR0915
self.predict_proba_calibrator_ = IsotonicRegression(
out_of_bounds="clip", y_min=0, y_max=1, increasing=True
)
ŷ_loo = self.loo_residuals_ + y_
ŷ_loo = y_ + self.loo_residuals_
target = np.zeros_like(y_)
target[y_ == np.max(y_)] = 1.0
self.predict_proba_calibrator_.fit(ŷ_loo, target, sample_weight_)
Expand All @@ -404,10 +389,11 @@ def decision_function(self, X: FloatMatrix[F]) -> FloatVector[F]:
φ = cast(KernelApproximatingFeatureMap, self.primal_feature_map_).transform(X)
ŷ = np.real(φ @ self.β̂_)
else:
# Shift and scale X, then predict as ŷ(x) := k(x, X) â + 1'â.
# Apply an affine transformation to X, then predict as ŷ(x) := k(x, X) â + 1'â.
X = cast(AffineFeatureMap, self.dual_feature_map_).transform(X)
K = rbf_kernel(X, self.X_, gamma=0.5)
ŷ = K @ self.α̂_ + np.sum(self.α̂_)
b = np.sum(self.α̂_)
ŷ = K @ self.α̂_ + b
return ŷ

def predict(self, X: FloatMatrix[F]) -> GenericVector:
Expand All @@ -423,8 +409,8 @@ def predict(self, X: FloatMatrix[F]) -> GenericVector:
# Remap to the original class labels.
ŷ = self.classes_[((ŷ_df + 1) // 2).astype(np.intp)]
elif self._estimator_type == "regressor":
# Undo the label shift and scale.
ŷ = ŷ_df.astype(np.float64) * self.y_scale_ + self.y_shift_
# The decision function is the point prediction.
ŷ = ŷ_df
# Map back to the training target dtype.
ŷ = ŷ.astype(self.y_dtype_)
return ŷ
Expand Down
Loading