from __future__ import annotations
import logging
import numpy as np
import pandas as pd
from sklearn.exceptions import NotFittedError
from scipy.stats import norm
from tqdm.auto import tqdm
from sklearn.linear_model import LinearRegression
from robpy.regression.base import RobustRegression, _convert_input_to_array
[docs]
class FastLTSRegression(RobustRegression):
def __init__(
self,
alpha: float = 0.5,
n_initial_subsets: int = 500,
n_initial_c_steps: int = 2,
n_best_models: int = 10,
reweighting: bool = True,
tolerance: float = 1e-15,
random_state: int = 42,
):
"""
Implementation of the FAST-LTS model based on the R implementation of the
ltsReg method in the robustbase R package (cfr.
https://www.rdocumentation.org/packages/robustbase/versions/0.93-8/topics/ltsReg)
and the python implementation `Reweighted-FastLTS` (cfr.
https://github.com/GiuseppeCannata/Reweighted-FastLTS/blob/master/Reweighted_FastLTS.py).
Args:
alpha (float, optional):
Percentage of the data to consider as subset for calculating the trimmed squared
error. Must be between 0.5 and 1, with 1 corresponding to the classic LS regression.
Defaults to 0.5.
n_initial_subset (int, optional):
Number of initial subsets to apply C-steps to (cfr `m` in original R
implementation). Defaults to 500.
n_initial_c_steps (int, optional):
Number of c-steps to apply to n_initial_subsets before final c-steps until
convergence. Defaults to 2.
n_best_models (int, optional):
Number of best models after initial c-steps to consider until convergence. Defaults
to 10.
reweighting (bool, optional):
Whether to apply reweighting to the raw estimates. Defaults to True.
tolerance (float, optional):
Acceptable delta in loss value between C-steps.
If current loss - previous loss <= tolerance, model is converged. Defaults to 1e-15.
"""
self.alpha = alpha
self.n_initial_subsets = n_initial_subsets
self.n_initial_c_steps = n_initial_c_steps
self.n_best_models = n_best_models
self.reweighting = reweighting
self.tolerance = tolerance
self.random_state = random_state
self.logger = logging.getLogger("FastLTS")
[docs]
def fit(
self,
X: np.ndarray | pd.DataFrame,
y: np.ndarray | pd.Series,
initial_weights: np.ndarray | None = None,
verbosity: int = logging.INFO,
) -> FastLTSRegression:
"""
Fit the model to the data.
Args:
X (np.ndarray | pd.DataFrame):
Training features.
y (np.ndarray | pd.Series):
Training labels.
initial_weights (np.ndarray | None, optional):
Optionally pass fixed initial weights, in case of n_initial_subsets > 1, this means
all models start from the same initial weights. There is therefore no benefit from
setting n_initial_subsets > 1. Defaults to None.
verbosity (int, optional): description. Defaults to logging.INFO.
Returns:
The fitted FastLTS object.
Reference:
- Rousseeuw P.J. (1984). Least Median of Squares Regression. Journal of the American
Statistical Association, 79(388), 871–880.
"""
self.logger.setLevel(verbosity)
X, y = _convert_input_to_array(X, y)
y = y.reshape(-1, 1)
if self.alpha < 0.5 or self.alpha > 1:
raise ValueError(
f"alpha must be between 0.5 and 1 (inclusive), but received {self.alpha}."
)
if int(X.shape[0] * self.alpha) < int((X.shape[0] + X.shape[1] + 1) / 2):
self.logger.warning(
f"h = alpha*n is too small and therefore set to [(n+p+1)/2]"
f" ({int((X.shape[0] + X.shape[1] + 1) / 2)})."
)
h = np.max([int(X.shape[0] * self.alpha), int((X.shape[0] + X.shape[1] + 1) / 2)])
self.logger.info(
f"Applying {self.n_initial_c_steps} initial c-steps "
f"on {self.n_initial_subsets} initial subsets"
)
lr_models, losses, h_subsets = self._apply_initial_C_steps(
X, y, h, initial_weights=initial_weights, verbosity=verbosity
)
best_model_idxs = np.argsort(losses)[: self.n_best_models]
best_model, best_loss, best_h_subset = (
lr_models[best_model_idxs[0]],
losses[best_model_idxs[0]],
h_subsets[best_model_idxs[0]],
)
self.logger.info(f"Performing final C-steps on {self.n_best_models} best models")
for model_idx in tqdm(best_model_idxs, disable=verbosity > logging.INFO):
(
current_model,
current_h_subset,
current_loss,
_,
) = self._apply_C_steps_untill_convergence(
lr_models[model_idx], losses[model_idx], X, y, h, self.tolerance, self.logger
)
if current_loss < best_loss:
best_loss = current_loss
best_model = current_model
best_h_subset = current_h_subset
self.model = best_model
self.best_h_subset = best_h_subset
self._scale = get_correction_factor(p=X.shape[1], n=X.shape[0], alpha=self.alpha) * np.sqrt(
best_loss
)
if self.reweighting:
residuals = y.reshape(-1, 1) - self.predict(X)
mask = (np.abs(residuals / np.std(residuals)) <= norm.ppf(0.9875)).flatten()
self.model = LinearRegression().fit(X[mask, :], y[mask])
new_subset = np.arange(X.shape[0])[mask]
best_loss = self._get_loss_value(X, y, new_subset, self.model)
self._scale = get_correction_factor_reweighting(
p=X.shape[1], n=X.shape[0], alpha=self.alpha
) * np.sqrt(best_loss)
return self
[docs]
def predict(self, X: np.ndarray | pd.DataFrame) -> np.ndarray:
if not hasattr(self, "model"):
raise NotFittedError
X, _ = _convert_input_to_array(X)
return self.model.predict(X).reshape(-1, 1)
def _apply_initial_C_steps(
self,
X: np.ndarray,
y: np.ndarray,
h: int,
initial_weights: np.ndarray | None,
verbosity: int = logging.DEBUG,
) -> tuple[list[LinearRegression], list[float], list[np.ndarray]]:
"""
Perform initial c_steps on n_initial_subsets of size n_features + 1.
Returns:
List of models, List of losses and List of h subsets.
"""
np.random.seed(self.random_state)
lr_models = []
losses = []
h_subsets = []
for seed, _ in tqdm(
enumerate(range(self.n_initial_subsets), start=self.random_state),
disable=verbosity > logging.INFO,
total=self.n_initial_subsets,
):
lr_model = self._get_initial_model(X, y, seed)
if initial_weights is not None:
self.logger.warning(
f"Initializing models with fixed weights {initial_weights} "
f"instead of random initializations."
)
lr_model.intercept_ = initial_weights[[0]]
lr_model.coef_ = initial_weights[None, 1:]
h_subset_idx = self._get_h_subset(lr_model, X, y, h)
for _ in range(self.n_initial_c_steps):
h_subset_idx, lr_model = self._apply_C_step(lr_model, X, y, h)
# get final residuals
losses.append(self._get_loss_value(X, y, h_subset_idx, lr_model))
lr_models.append(lr_model)
h_subsets.append(h_subset_idx)
return lr_models, losses, h_subsets
@staticmethod
def _get_initial_model(
X: np.ndarray,
y: np.ndarray,
random_state: int = 42,
) -> LinearRegression:
"""Get a Linear Regression model that is fitted on
a random subset of the data of size n_features + 1.
Args:
X (np.ndarray): Feature data.
y (np.ndarray): Labels.
random_state (int, optional): Random seed, will determine the random subset.
Defaults to 42.
Returns:
LinearRegression: A Linear Regression model fitted on a random subset.
"""
n_obs, n_features = X.shape # n, p
np.random.seed(random_state)
subset_idx = np.random.choice(n_obs, n_features + 1, replace=False)
lr_model = LinearRegression().fit(X[subset_idx], y[subset_idx])
return lr_model
@staticmethod
def _get_loss_value(
X: np.ndarray,
y: np.ndarray,
h_subset_idx: np.ndarray | list[int],
model: LinearRegression,
) -> float:
"""Get the Least Trimmed Squared loss for a specific model and h subset.
Args:
X (np.ndarray): Features.
y (np.ndarray): Labels.
h_subset_idx (np.ndarray): Indices of h subset.
model (LinearRegression): A trained Linear Regression model.
Returns:
float: Mean squared residual of h_subset.
"""
y_true = y[h_subset_idx].reshape(-1, 1)
y_pred = model.predict(X[h_subset_idx]).reshape(-1, 1)
residuals = y_true - y_pred
return np.sum(np.power(residuals, 2)) / len(h_subset_idx)
@staticmethod
def _apply_C_steps_untill_convergence(
current_model: LinearRegression,
previous_loss: float,
X: np.ndarray,
y: np.ndarray,
h: int,
tolerance: float = 1e-15,
logger: logging.Logger = logging.getLogger("FastLTS"),
) -> tuple[LinearRegression, np.ndarray, float, int]:
"""Apply c-steps until convergence.
Args:
current_model (LinearRegression): Model to start from.
previous_loss (float): Reference loss value.
X (np.ndarray): Training data features.
y (np.ndarray): Training data targets.
h (int): Number of samples to consider in subset.
tolerance (float, optional): Min delta in loss between iterations. Defaults to 1e-15.
logger (logging.Logger, optional): logger. Defaults to logging.getLogger('FastLTS').
Returns:
Tuple[LinearRegression, np.ndarray, float, int]:
updated model, h subset indices, final loss value, final iteration idx.
"""
iteration = 0
while True:
current_h_subset, current_model = FastLTSRegression._apply_C_step(
current_model, X, y, h
)
current_loss = FastLTSRegression._get_loss_value(X, y, current_h_subset, current_model)
logger.debug(
f"Iteration {iteration}: current loss = {current_loss:.3f}, "
f"previous loss = {previous_loss:.3f}"
)
if (previous_loss - current_loss) <= tolerance:
break
previous_loss = current_loss
iteration += 1
return current_model, current_h_subset, current_loss, iteration
@staticmethod
def _get_h_subset(
lr_model: LinearRegression, X: np.ndarray, y: np.ndarray, h: int
) -> np.ndarray:
"""Get the indices of the h observations with the smallest residuals for a given model.
Args:
lr_model (LinearRegression): A fitted Linear Regression Model.
X (np.ndarray): Features.
y (np.ndarray): Labels.
h (int): Number of observations to include in the subset.
Returns:
np.ndarray: Array of indices for the h subset.
"""
residuals = y - lr_model.predict(X).reshape(-1, 1)
return np.argsort(np.abs(residuals).flatten())[:h]
@staticmethod
def _apply_C_step(
lr_model: LinearRegression, X: np.ndarray, y: np.ndarray, h: int
) -> tuple[np.ndarray, LinearRegression]:
"""
Apply a single C-step.
Returns:
h subset indices, fitted lr model.
"""
h_subset_idx = FastLTSRegression._get_h_subset(lr_model, X, y, h)
lr_model = LinearRegression().fit(X[h_subset_idx], y[h_subset_idx])
return h_subset_idx, lr_model
[docs]
def get_correction_factor(p: int, n: int, alpha: float) -> float:
"""
Calculate the small sample correction factor for the scale resulting from LTS regression when
there is no reweighting.
References:
- Pison, G., Van Aelst, S., & Willems, G. (2002). Small sample corrections for LTS and MCD.
Metrika, 55(1), 111-123.
- https://github.com/cran/robustbase/blob/c4b9d21cfc4beb64653bb2ffba9e549e2dbb98ed/R/ltsReg.R
"""
if alpha < 0.5 or alpha > 1:
raise ValueError(f"alpha must be between 0.5 and 1, but received {alpha}")
if p == 0: # intercept only
fp_500_n = 1 - np.exp(0.262024211897096) / n**0.604756680630497
fp_875_n = 1 - np.exp(-0.351584646688712) / n**1.01646567502486
if 0.5 <= alpha <= 0.875:
fp_alpha_n = fp_500_n + (fp_875_n - fp_500_n) / 0.375 * (alpha - 0.5)
else: # 0.875 < alpha < 1
fp_alpha_n = fp_875_n + (1 - fp_875_n) / 0.125 * (alpha - 0.875)
fp_alpha_n = np.sqrt(fp_alpha_n)
else:
if p == 1:
fp_500_n = 1 - np.exp(0.630869217886906) / n**0.650789250442946
fp_875_n = 1 - np.exp(0.565065391014791) / n**1.03044199012509
else:
coefgqpkwad875 = np.array(
[
[-0.458580153984614, 1.12236071104403, 3],
[-0.267178168108996, 1.1022478781154, 5],
]
)
coefeqpkwad500 = np.array(
[
[-0.746945886714663, 0.56264937192689, 3],
[-0.535478048924724, 0.543323462033445, 5],
]
)
y_500 = np.log(-coefeqpkwad500[:, 0] / (p ** coefeqpkwad500[:, 1]))
y_875 = np.log(-coefgqpkwad875[:, 0] / (p ** coefgqpkwad875[:, 1]))
A_500 = np.column_stack((np.ones(2), -np.log(coefeqpkwad500[:, 2] * p**2)))
coeffic_500 = np.linalg.solve(A_500, y_500)
A_875 = np.column_stack((np.ones(2), -np.log(coefgqpkwad875[:, 2] * p**2)))
coeffic_875 = np.linalg.solve(A_875, y_875)
fp_500_n = 1 - np.exp(coeffic_500[0]) / n ** coeffic_500[1]
fp_875_n = 1 - np.exp(coeffic_875[0]) / n ** coeffic_875[1]
if alpha <= 0.875:
fp_alpha_n = fp_500_n + (fp_875_n - fp_500_n) / 0.375 * (alpha - 0.5)
else:
fp_alpha_n = fp_875_n + (1 - fp_875_n) / 0.125 * (alpha - 0.875)
return 1 / fp_alpha_n
[docs]
def get_correction_factor_reweighting(p: int, n: int, alpha: float) -> float:
"""
Calculate the small sample correction factor for the scale resulting from LTS regression when
there is reweighting.
References:
- Pison, G., Van Aelst, S., & Willems, G. (2002). Small sample corrections for LTS and MCD.
Metrika, 55(1), 111-123.
- https://github.com/cran/robustbase/blob/c4b9d21cfc4beb64653bb2ffba9e549e2dbb98ed/R/ltsReg.R
"""
if alpha < 0.5 or alpha > 1:
raise ValueError(f"alpha must be between 0.5 and 1, but received {alpha}")
if p == 0: # intercept only
fp_500_n = 1 - np.exp(1.11098143415027) / n**1.5182890270453
fp_875_n = 1 - np.exp(-0.66046776772861) / n**0.88939595831888
if 0.5 <= alpha <= 0.875:
fp_alpha_n = fp_500_n + (fp_875_n - fp_500_n) / 0.375 * (alpha - 0.5)
else: # 0.875 < alpha < 1
fp_alpha_n = fp_875_n + (1 - fp_875_n) / 0.125 * (alpha - 0.875)
fp_alpha_n = np.sqrt(fp_alpha_n)
else:
if p == 1:
fp_500_n = 1 - np.exp(1.58609654199605) / n**1.46340162526468
fp_875_n = 1 - np.exp(0.391653958727332) / n**1.03167487483316
else:
coefgqpkwad875 = np.array(
[
[-0.474174840843602, 1.39681715704956, 3],
[-0.276640353112907, 1.42543242287677, 5],
]
)
coefeqpkwad500 = np.array(
[
[-0.773365715932083, 2.02013996406346, 3],
[-0.337571678986723, 2.02037467454833, 5],
]
)
y_500 = np.log(-coefeqpkwad500[:, 0] / (p ** coefeqpkwad500[:, 1]))
y_875 = np.log(-coefgqpkwad875[:, 0] / (p ** coefgqpkwad875[:, 1]))
A_500 = np.column_stack((np.ones(2), -np.log(coefeqpkwad500[:, 2] * p**2)))
coeffic_500 = np.linalg.solve(A_500, y_500)
A_875 = np.column_stack((np.ones(2), -np.log(coefgqpkwad875[:, 2] * p**2)))
coeffic_875 = np.linalg.solve(A_875, y_875)
fp_500_n = 1 - np.exp(coeffic_500[0]) / n ** coeffic_500[1]
fp_875_n = 1 - np.exp(coeffic_875[0]) / n ** coeffic_875[1]
if alpha <= 0.875:
fp_alpha_n = fp_500_n + (fp_875_n - fp_500_n) / 0.375 * (alpha - 0.5)
else:
fp_alpha_n = fp_875_n + (1 - fp_875_n) / 0.125 * (alpha - 0.875)
return 1 / fp_alpha_n