Source code for robpy.regression.mm

from __future__ import annotations

import numpy as np
import pandas as pd

from sklearn.exceptions import NotFittedError
from sklearn.linear_model import LinearRegression

from robpy.regression.base import RobustRegression, _convert_input_to_array
from robpy.regression.s import SRegression
from robpy.utils.rho import BaseRho, TukeyBisquare


[docs] class MMRegression(RobustRegression): def __init__( self, initial_estimator: RobustRegression = SRegression(), rho: BaseRho = TukeyBisquare(c=3.44), max_iterations: int = 500, epsilon: float = 1e-7, ): """ Implementation of MM-regression estimator of Yohai, V. J. (1987). Args: initial_estimator (RobustRegression, optional): Initial regression estimator. Defaults to SRegression. rho (BaseRho, optional): The rho-function used for the MM-estimate. Defaults to TukeyBisquare(c=3.44). max_iterations (int, optional): Maximum number of iterations. Defaults to 500. epsilon (float, optional): If the absolute difference between all the new and old residuals in an iteration is below epsilon, we stop the computation. Defautls to 1e-7. References: - Yohai, V. J. (1987). High breakdown-point and high efficiency robust estimates for regression. The Annals of statistics, 15(2), 642-656. """ self.initial_estimator = initial_estimator self.rho = rho self.max_iterations = max_iterations self.epsilon = epsilon self.model = None
[docs] def fit(self, X: np.ndarray | pd.DataFrame, y: np.ndarray | pd.Series) -> MMRegression: X, y = _convert_input_to_array(X, y) self.model = self.initial_estimator.fit(X, y) self._scale = self.model.scale self.n_iter = 0 for i in range(self.max_iterations): residuals = y - self.model.predict(X) scaled_residuals = residuals / self._scale weights = self.rho.psi(scaled_residuals) / scaled_residuals self.model = LinearRegression().fit( np.sqrt(weights).reshape(-1, 1) * X, np.sqrt(weights) * y ) new_residuals = y - self.model.predict(X) self.n_iter = i + 1 if np.max(np.abs(new_residuals - residuals)) < self.epsilon: break return self
[docs] def predict(self, X: np.ndarray | pd.DataFrame) -> np.ndarray: if self.model is None: raise NotFittedError X, _ = _convert_input_to_array(X) return self.model.predict(X)