From 9df1f11ef77dcbccd756e4f95021143f326b2c58 Mon Sep 17 00:00:00 2001 From: Diane Napolitano Date: Wed, 31 Jan 2024 17:09:28 -0500 Subject: [PATCH] Spinning the bootstrap stuff into its own files --- .../BootstrapTransitionMatrixSolver.py | 199 ++++++++ ...test_bootstrap_transition_matrix_solver.py | 466 ++++++++++++++++++ 2 files changed, 665 insertions(+) create mode 100644 src/elexsolver/BootstrapTransitionMatrixSolver.py create mode 100644 tests/test_bootstrap_transition_matrix_solver.py diff --git a/src/elexsolver/BootstrapTransitionMatrixSolver.py b/src/elexsolver/BootstrapTransitionMatrixSolver.py new file mode 100644 index 0000000..38ac22e --- /dev/null +++ b/src/elexsolver/BootstrapTransitionMatrixSolver.py @@ -0,0 +1,199 @@ +import logging +import warnings + +import cvxpy as cp +import numpy as np +from tqdm import tqdm + +from elexsolver.logging import initialize_logging +from elexsolver.TransitionSolver import TransitionSolver + +initialize_logging() + +LOG = logging.getLogger(__name__) + + +class TransitionMatrixSolver(TransitionSolver): + """ + Matrix regression transition solver using CVXPY. + """ + + def __init__(self, strict: bool = True, lam: float | None = None): + """ + Parameters + ---------- + `strict` : bool, default True + If True, solution will be constrainted so that all coefficients are >= 0, + <= 1, and the sum of each row equals 1. + `lam` : float, optional + `lam` != 0 will enable L2 regularization (Ridge). + """ + super().__init__() + self._strict = strict + self._lambda = lam + + @staticmethod + def __get_constraints(coef: np.ndarray, strict: bool) -> list: + if strict: + return [0 <= coef, coef <= 1, cp.sum(coef, axis=1) == 1] + return [cp.sum(coef, axis=1) <= 1.1, cp.sum(coef, axis=1) >= 0.9] + + def __standard_objective(self, A: np.ndarray, B: np.ndarray, beta: np.ndarray) -> cp.Minimize: + loss_function = cp.norm(A @ beta - B, "fro") + return cp.Minimize(loss_function) + + def __ridge_objective(self, A: np.ndarray, B: np.ndarray, beta: np.ndarray) -> cp.Minimize: + # Based on https://www.cvxpy.org/examples/machine_learning/ridge_regression.html + lam = cp.Parameter(nonneg=True, value=self._lambda) + loss_function = cp.pnorm(A @ beta - B, p=2) ** 2 + regularizer = cp.pnorm(beta, p=2) ** 2 + return cp.Minimize(loss_function + lam * regularizer) + + def __solve(self, A: np.ndarray, B: np.ndarray, weights: np.ndarray) -> np.ndarray: + transition_matrix = cp.Variable((A.shape[1], B.shape[1]), pos=True) + Aw = np.dot(weights, A) + Bw = np.dot(weights, B) + + if self._lambda is None or self._lambda == 0: + objective = self.__standard_objective(Aw, Bw, transition_matrix) + else: + objective = self.__ridge_objective(Aw, Bw, transition_matrix) + + constraints = TransitionMatrixSolver.__get_constraints(transition_matrix, self._strict) + problem = cp.Problem(objective, constraints) + + with warnings.catch_warnings(): + warnings.simplefilter("error") + try: + problem.solve(solver=cp.CLARABEL) + except (UserWarning, cp.error.SolverError) as e: + LOG.error(e) + return np.zeros((A.shape[1], B.shape[1])) + + return transition_matrix.value + + def fit_predict(self, X: np.ndarray, Y: np.ndarray, weights: np.ndarray | None = None) -> np.ndarray: + self._check_data_type(X) + self._check_data_type(Y) + self._check_any_element_nan_or_inf(X) + self._check_any_element_nan_or_inf(Y) + + # matrices should be (units x things), where the number of units is > the number of things + if X.shape[1] > X.shape[0]: + X = X.T + if Y.shape[1] > Y.shape[0]: + Y = Y.T + + if X.shape[0] != Y.shape[0]: + raise ValueError(f"Number of units in X ({X.shape[0]}) != number of units in Y ({Y.shape[0]}).") + + self._check_dimensions(X) + self._check_dimensions(Y) + self._check_for_zero_units(X) + self._check_for_zero_units(Y) + + if not isinstance(X, np.ndarray): + X = X.to_numpy() + if not isinstance(Y, np.ndarray): + Y = Y.to_numpy() + + X_expected_totals = X.sum(axis=0) / X.sum(axis=0).sum() + + X = self._rescale(X) + Y = self._rescale(Y) + + weights = self._check_and_prepare_weights(X, Y, weights) + + percentages = self.__solve(X, Y, weights) + self._transitions = np.diag(X_expected_totals) @ percentages + return percentages + + +class BootstrapTransitionMatrixSolver(TransitionSolver): + """ + Bootstrap version of the matrix regression transition solver. + """ + + def __init__(self, B: int = 1000, strict: bool = True, verbose: bool = True, lam: int | None = None): + """ + Parameters + ---------- + `B` : int, default 1000 + Number of bootstrap samples to draw and matrix solver models to fit/predict. + `strict` : bool, default True + If `True`, solution will be constrainted so that all coefficients are >= 0, + <= 1, and the sum of each row equals 1. + `verbose` : bool, default True + If `False`, this will reduce the amount of logging produced for each of the `B` bootstrap samples. + `lam` : float, optional + `lam` != 0 will enable L2 regularization (Ridge). + """ + super().__init__() + self._strict = strict + self._B = B + self._verbose = verbose + self._lambda = lam + + # class members that are instantiated during model-fit + self._predicted_percentages = None + self._X_expected_totals = None + + def fit_predict(self, X: np.ndarray, Y: np.ndarray, weights: np.ndarray | None = None) -> np.ndarray: + self._predicted_percentages = [] + + # assuming pandas.DataFrame + if not isinstance(X, np.ndarray): + X = X.to_numpy() + if not isinstance(Y, np.ndarray): + Y = Y.to_numpy() + + self._X_expected_totals = X.sum(axis=0) / X.sum(axis=0).sum() + + tm = TransitionMatrixSolver(strict=self._strict, lam=self._lambda) + self._predicted_percentages.append(tm.fit_predict(X, Y, weights=weights)) + + for b in tqdm(range(0, self._B - 1), desc="Bootstrapping", disable=not self._verbose): + rng = np.random.default_rng(seed=b) + X_resampled = rng.choice( + X, len(X), replace=True, axis=0, p=(weights / weights.sum() if weights is not None else None) + ) + indices = [np.where((X == x).all(axis=1))[0][0] for x in X_resampled] + Y_resampled = Y[indices] + self._predicted_percentages.append(tm.fit_predict(X_resampled, Y_resampled, weights=None)) + + percentages = np.mean(self._predicted_percentages, axis=0) + self._transitions = np.diag(self._X_expected_totals) @ percentages + return percentages + + def get_confidence_interval(self, alpha: float, transitions: bool = False) -> (np.ndarray, np.ndarray): + """ + Parameters + ---------- + `alpha` : float + Value between [0, 1). If greater than 1, will be divided by 100. + `transitions` : bool, default False + If True, the returned matrices will represent transitions, not percentages. + + Returns + ------- + A tuple of two np.ndarray matrices of float. Element 0 has the lower bound and 1 has the upper bound. + """ + if alpha > 1: + alpha = alpha / 100 + if alpha < 0 or alpha >= 1: + raise ValueError(f"Invalid confidence interval {alpha}.") + + p_lower = ((1.0 - alpha) / 2.0) * 100 + p_upper = ((1.0 + alpha) / 2.0) * 100 + + percentages = ( + np.percentile(self._predicted_percentages, p_lower, axis=0), + np.percentile(self._predicted_percentages, p_upper, axis=0), + ) + + if transitions: + return ( + np.diag(self._X_expected_totals) @ percentages[0], + np.diag(self._X_expected_totals) @ percentages[1], + ) + return percentages diff --git a/tests/test_bootstrap_transition_matrix_solver.py b/tests/test_bootstrap_transition_matrix_solver.py new file mode 100644 index 0000000..a11ffdc --- /dev/null +++ b/tests/test_bootstrap_transition_matrix_solver.py @@ -0,0 +1,466 @@ +import numpy as np +import pytest + +from elexsolver.TransitionMatrixSolver import BootstrapTransitionMatrixSolver, TransitionMatrixSolver + +RTOL = 1e-04 +ATOL = 1e-04 + + +def test_matrix_fit_predict(): + X = np.array( + [ + [1, 2], + [3, 4], + [5, 6], + [7, 8], + [9, 10], + [11, 12], + ] + ) + + Y = np.array( + [ + [2, 3], + [4, 5], + [6, 7], + [8, 9], + [10, 11], + [12, 13], + ] + ) + + expected = np.array([[0.760428, 0.239572], [0.216642, 0.783358]]) + + tms = TransitionMatrixSolver() + current = tms.fit_predict(X, Y) + np.testing.assert_allclose(expected, current, rtol=RTOL, atol=ATOL) + + +def test_matrix_fit_predict_with_weights(): + X = np.array( + [ + [1, 2], + [3, 4], + [5, 6], + [7, 8], + [9, 10], + [11, 12], + ] + ) + + Y = np.array( + [ + [2, 3], + [4, 5], + [6, 7], + [8, 9], + [10, 11], + [12, 13], + ] + ) + + weights = np.array([500, 250, 125, 62.5, 31.25, 15.625]) + + expected = np.array([[0.737329, 0.262671], [0.230589, 0.769411]]) + + tms = TransitionMatrixSolver() + current = tms.fit_predict(X, Y, weights=weights) + np.testing.assert_allclose(expected, current, rtol=RTOL, atol=ATOL) + + +def test_matrix_fit_predict_not_strict(): + X = np.array( + [ + [1, 2], + [3, 4], + [5, 6], + [7, 8], + [9, 10], + [11, 12], + ] + ) + + Y = np.array( + [ + [2, 3], + [4, 5], + [6, 7], + [8, 9], + [10, 11], + [12, 13], + ] + ) + + expected = np.array([[0.760451, 0.239558], [0.216624, 0.783369]]) + + tms = TransitionMatrixSolver(strict=False) + current = tms.fit_predict(X, Y) + np.testing.assert_allclose(expected, current, rtol=RTOL, atol=ATOL) + + +def test_ridge_matrix_fit_predict(): + X = np.array( + [ + [1, 2], + [3, 4], + [5, 6], + [7, 8], + [9, 10], + [11, 12], + ] + ) + + Y = np.array( + [ + [2, 3], + [4, 5], + [6, 7], + [8, 9], + [10, 11], + [12, 13], + ] + ) + + expected = np.array([[0.479416, 0.520584], [0.455918, 0.544082]]) + + tms = TransitionMatrixSolver(lam=1) + current = tms.fit_predict(X, Y) + np.testing.assert_allclose(expected, current, rtol=RTOL, atol=ATOL) + + +def test_matrix_fit_predict_pivoted(): + X = np.array( + [ + [1, 2], + [3, 4], + [5, 6], + [7, 8], + [9, 10], + [11, 12], + ] + ).T + + Y = np.array( + [ + [2, 3], + [4, 5], + [6, 7], + [8, 9], + [10, 11], + [12, 13], + ] + ).T + + expected = np.array([[0.760428, 0.239572], [0.216642, 0.783358]]) + + tms = TransitionMatrixSolver() + current = tms.fit_predict(X, Y) + np.testing.assert_allclose(expected, current, rtol=RTOL, atol=ATOL) + + +def test_matrix_get_prediction_interval(): + tms = TransitionMatrixSolver() + with pytest.raises(NotImplementedError): + tms.get_prediction_interval(0) + + +def test_matrix_fit_predict_bad_dimensions(): + X = np.array( + [ + [1, 2], + [3, 4], + [5, 6], + [7, 8], + [9, 10], + [11, 12], + ] + ) + + Y = np.array( + [ + [2, 3], + [4, 5], + [6, 7], + [8, 9], + ] + ) + + tms = TransitionMatrixSolver() + with pytest.raises(ValueError): + tms.fit_predict(X, Y) + + +def test_matrix_fit_predict_pandas(): + try: + import pandas # pylint: disable=import-outside-toplevel + + X = pandas.DataFrame( + [ + [1, 2], + [3, 4], + [5, 6], + [7, 8], + [9, 10], + [11, 12], + ], + columns=["x1", "x2"], + ) + + Y = pandas.DataFrame( + [ + [2, 3], + [4, 5], + [6, 7], + [8, 9], + [10, 11], + [12, 13], + ], + columns=["y1", "y2"], + ) + + expected = np.array([[0.760428, 0.239572], [0.216642, 0.783358]]) + + tms = TransitionMatrixSolver() + current = tms.fit_predict(X, Y) + np.testing.assert_allclose(expected, current, rtol=RTOL, atol=ATOL) + + except ImportError: + # pass this test through since pandas isn't a requirement for elex-solver + assert True + + +def test_bootstrap_fit_predict(): + X = np.array( + [ + [1, 2], + [3, 4], + [5, 6], + [7, 8], + [9, 10], + [11, 12], + ] + ) + + Y = np.array( + [ + [2, 3], + [4, 5], + [6, 7], + [8, 9], + [10, 11], + [12, 13], + ] + ) + + expected = np.array([[0.809393, 0.190607], [0.173843, 0.826157]]) + + btms = BootstrapTransitionMatrixSolver(B=10, verbose=False) + current = btms.fit_predict(X, Y) + np.testing.assert_allclose(expected, current, rtol=RTOL, atol=ATOL) + + +def test_bootstrap_fit_predict_with_weights(): + X = np.array( + [ + [1, 2], + [3, 4], + [5, 6], + [7, 8], + [9, 10], + [11, 12], + ] + ) + + Y = np.array( + [ + [2, 3], + [4, 5], + [6, 7], + [8, 9], + [10, 11], + [12, 13], + ] + ) + + weights = np.array([500, 250, 125, 62.5, 31.25, 15.625]) + + expected = np.array([[0.739798, 0.260202], [0.229358, 0.770642]]) + + btms = BootstrapTransitionMatrixSolver(B=10, verbose=False) + current = btms.fit_predict(X, Y, weights=weights) + np.testing.assert_allclose(expected, current, rtol=RTOL, atol=ATOL) + + +def test_bootstrap_confidence_interval_percentages(): + X = np.array( + [ + [1, 2], + [3, 4], + [5, 6], + [7, 8], + [9, 10], + [11, 12], + ] + ) + + Y = np.array( + [ + [2, 3], + [4, 5], + [6, 7], + [8, 9], + [10, 11], + [12, 13], + ] + ) + + expected_lower = np.array([[0.757573, 0.095978], [0.09128, 0.779471]]) + expected_upper = np.array([[0.904022, 0.242427], [0.220529, 0.90872]]) + + btms = BootstrapTransitionMatrixSolver(B=10, verbose=False) + _ = btms.fit_predict(X, Y) + (current_lower, current_upper) = btms.get_confidence_interval(0.95, transitions=False) + np.testing.assert_allclose(expected_lower, current_lower, rtol=RTOL, atol=ATOL) + np.testing.assert_allclose(expected_upper, current_upper, rtol=RTOL, atol=ATOL) + + +def test_bootstrap_confidence_interval_greater_than_1(): + X = np.array( + [ + [1, 2], + [3, 4], + [5, 6], + [7, 8], + [9, 10], + [11, 12], + ] + ) + + Y = np.array( + [ + [2, 3], + [4, 5], + [6, 7], + [8, 9], + [10, 11], + [12, 13], + ] + ) + + expected_lower = np.array([[0.757573, 0.095978], [0.09128, 0.779471]]) + expected_upper = np.array([[0.904022, 0.242427], [0.220529, 0.90872]]) + + btms = BootstrapTransitionMatrixSolver(B=10, verbose=False) + _ = btms.fit_predict(X, Y) + (current_lower, current_upper) = btms.get_confidence_interval(95, transitions=False) + np.testing.assert_allclose(expected_lower, current_lower, rtol=RTOL, atol=ATOL) + np.testing.assert_allclose(expected_upper, current_upper, rtol=RTOL, atol=ATOL) + + +def test_bootstrap_confidence_interval_invalid(): + X = np.array( + [ + [1, 2], + [3, 4], + [5, 6], + [7, 8], + [9, 10], + [11, 12], + ] + ) + + Y = np.array( + [ + [2, 3], + [4, 5], + [6, 7], + [8, 9], + [10, 11], + [12, 13], + ] + ) + + btms = BootstrapTransitionMatrixSolver(B=10, verbose=False) + _ = btms.fit_predict(X, Y) + + with pytest.raises(ValueError): + btms.get_confidence_interval(-34) + + +def test_bootstrap_confidence_interval_transitions(): + X = np.array( + [ + [1, 2], + [3, 4], + [5, 6], + [7, 8], + [9, 10], + [11, 12], + ] + ) + + Y = np.array( + [ + [2, 3], + [4, 5], + [6, 7], + [8, 9], + [10, 11], + [12, 13], + ] + ) + + expected_lower = np.array([[0.349649, 0.044297], [0.049151, 0.419715]]) + expected_upper = np.array([[0.417241, 0.111889], [0.118746, 0.489311]]) + + btms = BootstrapTransitionMatrixSolver(B=10, verbose=False) + _ = btms.fit_predict(X, Y) + (current_lower, current_upper) = btms.get_confidence_interval(0.95, transitions=True) + np.testing.assert_allclose(expected_lower, current_lower, rtol=RTOL, atol=ATOL) + np.testing.assert_allclose(expected_upper, current_upper, rtol=RTOL, atol=ATOL) + + +def test_bootstrap_get_prediction_interval(): + btms = BootstrapTransitionMatrixSolver(B=10, verbose=False) + with pytest.raises(NotImplementedError): + btms.get_prediction_interval(0) + + +def test_bootstrap_fit_predict_pandas(): + try: + import pandas # pylint: disable=import-outside-toplevel + + X = pandas.DataFrame( + [ + [1, 2], + [3, 4], + [5, 6], + [7, 8], + [9, 10], + [11, 12], + ], + columns=["x1", "x2"], + ) + + Y = pandas.DataFrame( + [ + [2, 3], + [4, 5], + [6, 7], + [8, 9], + [10, 11], + [12, 13], + ], + columns=["y1", "y2"], + ) + + expected = np.array([[0.809393, 0.190607], [0.173843, 0.826157]]) + + btms = BootstrapTransitionMatrixSolver(B=10, verbose=False) + current = btms.fit_predict(X, Y) + np.testing.assert_allclose(expected, current, rtol=RTOL, atol=ATOL) + + except ImportError: + # pass this test through since pandas isn't a requirement for elex-solver + assert True