Source code for robpy.univariate.mcd

import numpy as np
import logging

from robpy.univariate.base import RobustScale
from scipy.stats import chi2, gamma


[docs] class UnivariateMCD(RobustScale): def __init__(self, alpha: float | int | None = None, consistency_correction: bool = True): """ Implementation of the :math:`O(n \\log n)` algorithm for the univariate MCD on pages 171-172 of Rousseeuw, P.J., & Leroy, A. (1987). Args: alpha (float | int | None, optional): Size of the h subset. If an integer between n/2 and n is passed, it is interpreted as an absolute value. If a float between 0.5 and 1 is passed, it is interpreted as a proportion of n (the training set size). If None or below [n/2] + 1, it is set to [n/2] + 1. Defaults to None. consistency_correction (boolean, optional): Whether the estimates should be consistent at the normal model. Defaults to True. References: - Rousseeuw, P.J., & Leroy, A. (1987). Robust Regression and Outlier Detection. John Wiley & Sons, New York. """ super().__init__() self.alpha = alpha self.consistency_correction = consistency_correction self.logger = logging.getLogger("UnivariateMCD") def _calculate(self, X: np.ndarray): self._set_h_size(X) if self.h_size == X.shape[0]: self.raw_location_ = self.location_ = X.mean() self.raw_scale_ = self.scale_ = X.std() self.raw_variance_ = self.variance_ = np.square(self.raw_scale_) return self.raw_variance_, self.raw_location_ = self._get_raw_estimates(X) self.variance_, self.location_ = self._reweighting(X) self.raw_scale_ = np.sqrt(self.raw_variance_) self.scale_ = np.sqrt(self.variance_) def _get_raw_estimates(self, X: np.ndarray): n = len(X) X = np.sort(X) var_best = np.inf index_best = 1 for i in range(n - self.h_size + 1): var_new = np.var(X[i : (i + self.h_size)]) if var_new < var_best: var_best = var_new index_best = i raw_var = var_best raw_loc = np.mean(X[index_best : (index_best + self.h_size)]) if self.consistency_correction: # [Minimum covariance determinant, Mia Hubert & Michiel Debruyne (2009)] raw_var = raw_var * (self.h_size / n) / chi2.cdf(chi2.ppf(self.h_size / n, df=1), df=3) return raw_var, raw_loc def _reweighting(self, X: np.ndarray): distances = (X - self.raw_location_) ** 2 / self.raw_variance_ mask = distances < chi2.ppf(0.975, df=1) loc = np.mean(X[mask]) var = np.var(X[mask]) if self.consistency_correction: # Croux & Haesbroeck (1999) delta = np.sum(mask) / len(X) var = var * delta * np.reciprocal(gamma.cdf(chi2.ppf(delta, df=1) / 2, a=3 / 2)) return var, loc def _set_h_size(self, X: np.ndarray): n = len(X) if self.alpha is None: self.h_size = n // 2 + 1 elif isinstance(self.alpha, int) and (n / 2 <= self.alpha <= n): if self.alpha < n // 2 + 1: self.logger.warning( f"h = alpha*n is too small and therefore set to [n/2] + 1" f" ({n // 2 + 1})." ) self.h_size = np.max([self.alpha, n // 2 + 1]) elif (isinstance(self.alpha, float) and (0.5 <= self.alpha <= 1)) or self.alpha == 1: if int(self.alpha * n) < n // 2 + 1: self.logger.warning( f"h = alpha*n is too small and therefore set to [n/2] + 1" f" ({n // 2 + 1})." ) self.h_size = np.max([int(self.alpha * n), n // 2 + 1]) else: raise ValueError( f"alpha must be an integer between n/2 and n or a float between 0.5 and 1, " f"but received {self.alpha}." )