from __future__ import annotations
import logging
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from scipy.stats import median_abs_deviation, chi2
from sklearn.base import RegressorMixin, BaseEstimator
from sklearn.exceptions import NotFittedError
from robpy.utils.distance import mahalanobis_distance
from robpy.covariance import FastMCD
logger = logging.getLogger(__name__)
[docs]
class RobustRegression(RegressorMixin, BaseEstimator):
def __init__(
self,
):
super().__init__()
[docs]
def fit(self, X, y) -> RobustRegression:
raise NotImplementedError
[docs]
def predict(self, X):
raise NotImplementedError
@property
def scale(self) -> float:
scale = getattr(self, "_scale", None)
if scale is None:
raise NotFittedError("Model has not been fitted yet.")
return scale
[docs]
def outlier_map(
self,
X,
y,
robust_scaling: bool = True,
robust_distance: bool = True,
vertical_outlier_threshold: float = 2.5,
leverage_threshold_percentile: float = 0.975,
figsize: tuple[int, int] = (4, 4),
return_data: bool = False,
) -> None | tuple[np.ndarray, np.ndarray, np.ndarray, float, float]:
"""Create a diagnostic plot where robust residuals are plotted against the robust
mahalabobis distances of the training data.
Args:
X (array like of shape (n_samples, n_features)): training features
y (array like of shape (n_samples, )): training targets
robust_scaling (bool): whether to scale residuals using MAD instead of std
robust_distance (bool): whether to use MCD as loc/scale estimator instead of mean/cov
for calculating the Mahalanobis distances
vertical_outlier_threshold: where to draw the upper (and lower) limit for
the standardized residuals to indicate outliers
leverage_threshold_percentile: which percentile from the chisquare distribution
to use to set as threshold for leverage points
figsize (tuple[int, int], optional): Size of the plot. Defaults to (10, 4).
return_data (bool, optional):
Whether to return the residuals, the standardized residuals and the distances.
Defaults to False.
"""
residuals = y.reshape(-1, 1) - self.predict(X).reshape(-1, 1)
standardized_residuals = (
residuals
/ (
median_abs_deviation(residuals, scale="normal")
if robust_scaling
else np.std(residuals)
)
).flatten()
if robust_distance:
mcd = FastMCD().fit(X)
covariance = mcd.covariance_
location = mcd.location_
else:
covariance = np.cov(X, rowvar=False)
location = np.mean(X, axis=0)
distances = mahalanobis_distance(X, location=location, covariance=covariance)
_, ax = plt.subplots(1, 1, figsize=figsize)
ax.scatter(x=distances, y=standardized_residuals)
ax.set_xlabel(f"{'Robust' if robust_distance else 'Non-robust'} distance of X")
ax.set_ylabel(f"{'Robust' if robust_scaling else 'Non-robust'} standardized residuals of y")
ax.axhline(vertical_outlier_threshold, ls="--", c="grey")
ax.axhline(-vertical_outlier_threshold, ls="--", c="grey")
df = X.shape[-1]
leverage_threshold = float(np.sqrt(chi2.ppf(leverage_threshold_percentile, df)))
ax.axvline(leverage_threshold, ls="--", c="grey")
if return_data:
return (
residuals,
standardized_residuals,
distances,
vertical_outlier_threshold,
leverage_threshold,
)
def _convert_input_to_array(X, y=None) -> tuple[np.ndarray, np.ndarray | None]:
if isinstance(X, pd.DataFrame):
X = X.values
if (len(X.shape) == 1) or (X.shape[1] == 1):
X = X.reshape(-1, 1)
if isinstance(y, pd.Series):
y = y.values
return X, y