diff --git a/pysindy/optimizers/stlsq.py b/pysindy/optimizers/stlsq.py index 0bc09699a..6cc4dd20e 100644 --- a/pysindy/optimizers/stlsq.py +++ b/pysindy/optimizers/stlsq.py @@ -1,8 +1,10 @@ import warnings +from typing import Union import numpy as np from scipy.linalg import LinAlgWarning from sklearn.exceptions import ConvergenceWarning +from sklearn.linear_model import LinearRegression from sklearn.linear_model import ridge_regression from sklearn.utils.validation import check_is_fitted @@ -62,6 +64,11 @@ class STLSQ(BaseOptimizer): verbose : bool, optional (default False) If True, prints out the different error terms every iteration. + sparse_ind : list, optional (default None) + Indices to threshold and perform ridge regression upon. + If None, sparse thresholding and ridge regression is applied to all + indices. + Attributes ---------- coef_ : array, shape (n_features,) or (n_targets, n_features) @@ -107,6 +114,7 @@ def __init__( copy_X=True, initial_guess=None, verbose=False, + sparse_ind=None, ): super(STLSQ, self).__init__( max_iter=max_iter, @@ -125,28 +133,56 @@ def __init__( self.ridge_kw = ridge_kw self.initial_guess = initial_guess self.verbose = verbose + self.sparse_ind = sparse_ind - def _sparse_coefficients(self, dim, ind, coef, threshold): - """Perform thresholding of the weight vector(s)""" + def _sparse_coefficients( + self, dim: int, ind_nonzero: np.ndarray, coef: np.ndarray, threshold: float + ) -> (np.ndarray, np.ndarray): + """Perform thresholding of the weight vector(s) (on specific indices + if ``self.sparse_ind`` is not None)""" c = np.zeros(dim) - c[ind] = coef + c[ind_nonzero] = coef big_ind = np.abs(c) >= threshold - c[~big_ind] = 0 + if self.sparse_ind is not None: + sparse_ind_mask = np.zeros_like(ind_nonzero) + sparse_ind_mask[self.sparse_ind] = True + c[~big_ind & sparse_ind_mask] = 0 + if self.sparse_ind is None: + c[~big_ind] = 0 return c, big_ind - def _regress(self, x, y): - """Perform the ridge regression""" + def _regress(self, x: np.ndarray, y: np.ndarray, dim: int, sparse_sub: np.ndarray): + """Perform the ridge regression (on specific indices if + ``self.sparse_ind`` is not None)""" kw = self.ridge_kw or {} - - with warnings.catch_warnings(): - warnings.filterwarnings("ignore", category=LinAlgWarning) - try: - coef = ridge_regression(x, y, self.alpha, **kw) - except LinAlgWarning: - # increase alpha until warning stops - self.alpha = 2 * self.alpha - self.iters += 1 - return coef + if self.sparse_ind is None: + with warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=LinAlgWarning) + try: + coef = ridge_regression(x, y, self.alpha, **kw) + except LinAlgWarning: + # increase alpha until warning stops + self.alpha = 2 * self.alpha + self.iters += 1 + return coef + if self.sparse_ind is not None: + alpha_array = np.zeros((dim, dim)) + alpha_array[sparse_sub, sparse_sub] = np.sqrt(self.alpha) + x_lin = np.concatenate((x, alpha_array), axis=0) + y_lin = np.concatenate((y, np.zeros((dim,)))) + with warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=LinAlgWarning) + try: + coef = ( + LinearRegression(fit_intercept=False, **kw) + .fit(x_lin, y_lin) + .coef_ + ) + except LinAlgWarning: + # increase alpha until warning stops + self.alpha = 2 * self.alpha + self.iters += 1 + return coef def _no_change(self): """Check if the coefficient mask has changed after thresholding""" @@ -171,6 +207,7 @@ def _reduce(self, x, y): n_samples, n_features = x.shape n_targets = y.shape[1] n_features_selected = np.sum(ind) + sparse_sub = [np.array(self.sparse_ind)] * y.shape[1] # Print initial values for each term in the optimization if self.verbose: @@ -203,10 +240,19 @@ def _reduce(self, x, y): "coefficients".format(self.threshold) ) continue - coef_i = self._regress(x[:, ind[i]], y[:, i]) + coef_i = self._regress( + x[:, ind[i]], y[:, i], np.count_nonzero(ind[i]), sparse_sub[i] + ) coef_i, ind_i = self._sparse_coefficients( n_features, ind[i], coef_i, self.threshold ) + if self.sparse_ind is not None: + vals_to_remove = np.intersect1d( + self.sparse_ind, np.where(coef_i == 0) + ) + sparse_sub[i] = _remove_and_decrement( + self.sparse_ind, vals_to_remove + ) coef[i] = coef_i ind[i] = ind_i @@ -248,3 +294,14 @@ def complexity(self): return np.count_nonzero(self.coef_) + np.count_nonzero( [abs(self.intercept_) >= self.threshold] ) + + +def _remove_and_decrement( + existing_vals: Union[np.ndarray, list], vals_to_remove: Union[np.ndarray, list] +) -> np.ndarray: + """Remove elements from existing values and decrement the elements + that are greater than the removed elements""" + for s in reversed(vals_to_remove): + existing_vals = np.delete(existing_vals, np.where(s == existing_vals)) + existing_vals = np.where(existing_vals > s, existing_vals - 1, existing_vals) + return existing_vals diff --git a/test/test_optimizers.py b/test/test_optimizers.py index 33d5e18a3..228191f57 100644 --- a/test/test_optimizers.py +++ b/test/test_optimizers.py @@ -29,6 +29,7 @@ from pysindy.optimizers import StableLinearSR3 from pysindy.optimizers import STLSQ from pysindy.optimizers import TrappingSR3 +from pysindy.optimizers.stlsq import _remove_and_decrement from pysindy.utils import supports_multiple_targets from pysindy.utils.odes import enzyme @@ -1133,3 +1134,36 @@ def test_frols_error_linear_dependence(): y = np.array([[1.0, 1.0]]) with pytest.raises(ValueError): opt.fit(x, y) + + +def test_sparse_subset_multitarget(): + A = np.diag([1, 1, 1, 1]) + b = np.array([[1, 1, 0.5, 1], [1, 1, 1, 0.5]]).T + opt = STLSQ(threshold=0.5, alpha=0.1, sparse_ind=[2, 3]) + opt.fit(A, b) + X = opt.coef_ + assert X[0, 2] == 0.0 + assert X[0, 3] > 0.0 and X[0, 3] < 1.0 + np.testing.assert_equal(X[:, :2], np.ones((2, 2))) + assert X[1, 3] == 0.0 + assert X[1, 2] > 0.0 and X[1, 2] < 1.0 + + +def test_sparse_subset_off_diagonal(): + A = np.array([[1, 1], [0, 1]]) + b = np.array([1, 1]) + opt = STLSQ(threshold=0.1, alpha=0.1, sparse_ind=[1]) + opt.fit(A, b) + X = opt.coef_ + assert X[0, 0] > 0.0 and X[0, 0] < 0.5 + assert X[0, 1] > 0.5 and X[0, 1] < 1.0 + + +def test_remove_and_decrement(): + existing_vals = np.array([2, 3, 4, 5]) + vals_to_remove = np.array([3, 5]) + expected = np.array([2, 3]) + result = _remove_and_decrement( + existing_vals=existing_vals, vals_to_remove=vals_to_remove + ) + np.testing.assert_array_equal(expected, result)