Source code for robpy.covariance.mcd

import logging
import numpy as np
import math as math
import warnings

from dataclasses import dataclass
from scipy.linalg import sqrtm
from scipy.stats import chi2, gamma, rankdata, norm

from robpy.covariance.base import RobustCovariance
from robpy.utils.distance import mahalanobis_distance
from robpy.utils.logging import get_logger
from robpy.univariate.qn import Qn
from robpy.covariance.ogk import OGK
from robpy.univariate.tau import Tau


[docs] @dataclass class HSubset: indices: np.ndarray location: np.ndarray scale: np.ndarray determinant: float n_c_steps: int = 0
[docs] class FastMCD(RobustCovariance): def __init__( self, *, alpha: float | int | None = None, n_initial_subsets: int = 500, n_initial_c_steps: int = 2, n_best_subsets: int = 10, n_partitions: int | None = None, tolerance: float = 1e-8, correct_covariance: bool = True, reweighting: bool = True, verbosity: int = logging.WARNING, store_precision=True, assume_centered=False, random_seed: int | None = None, ): """ Fast MCD estimator based on the algorithm proposed in Rousseeuw, P. J., & Van Driessen, K. (1999). Args: alpha (float | int | None, optional): Size of the h subset. If an integer between n/2 and n is passed, it is interpreted as h. If a float between 0.5 and 1 is passed, it is interpreted as a proportion of n (the training set size). If None or an integer below [(n+p+1)/2], h is set to [(n+p+1)/2]. Defaults to None. n_initial_subsets (int, optional): Number of initial random subsets of size p+1. Defaults to 500. n_initial_c_steps (int, optional): Number of initial c steps to perform on all initial subsets. Defaults to 2. n_best_subsets (int, optional): Number of best subsets to keep and perform c steps on until convergence. Defaults to 10. n_partitions (int, optional): Number of partitions to split the data into. This can speed up the algorithm for large datasets (n > 600 suggested in paper). If None, 5 partitions are used if n > 600, otherwise 1 partition is used. tolerance (float, optional): Minimum difference in determinant between two iterations to stop the C-step. Defaults to 1e-8. correct_covariance (bool, optional): Whether to apply a consistency correction to the raw covariance estimate. Defaults to True. reweighting (bool, optional): Whether to apply reweighting to the raw covariance estimate. Defaults to True. random_seed (int | None, optional): Can be used to provide a random seed. Defaults to None. References: - Rousseeuw, P. J., & Van Driessen, K. (1999). A fast algorithm for the minimum covariance determinant estimator. Technometrics, 41(3), 212-223. """ super().__init__(store_precision=store_precision, assume_centered=assume_centered) self.alpha = alpha self.n_initial_subsets = n_initial_subsets self.n_initial_c_steps = n_initial_c_steps self.n_best_subsets = n_best_subsets self.n_partitions = n_partitions self.tolerance = tolerance self.correct_covariance = correct_covariance self.reweighting = reweighting self.logger = get_logger("FastMCD", level=verbosity) self.verbosity = verbosity self.random_seed = random_seed
[docs] def calculate_covariance(self, X) -> np.ndarray: if self.alpha == 1 or self.alpha == X.shape[0]: self.logger.warning(f"Default covariance is returned as alpha is {self.alpha}.") self.location_ = X.mean(0) return np.cov(X, rowvar=False) self.rng = np.random.default_rng(self.random_seed) # partition data (n_partitions > 1 can speed up algorithm for large datasets) partitions = self._partition_data(X) self.logger.info(f"Partitioned data into {len(partitions)} partitions") n_initial_subsets = self.n_initial_subsets // len(partitions) best_subsets = [] # perform initial c steps on all initial subsets for data in partitions: subsets = self._get_initial_subsets(data, n_initial_subsets) subsets = [ self._perform_multiple_c_steps(subset, data, n_iterations=self.n_initial_c_steps) for subset in subsets ] best_subsets.extend(sorted(subsets, key=lambda x: x.determinant)[: self.n_best_subsets]) self.logger.info(f"Selecting {self.n_best_subsets} best subsets from {len(best_subsets)}") # perform additional c-steps on the best subsets best_subsets = [ self._perform_multiple_c_steps(subset, X, n_iterations=self.n_initial_c_steps) for subset in best_subsets ] best_subsets = sorted(best_subsets, key=lambda x: x.determinant)[: self.n_best_subsets] best_subset = best_subsets[0] # reference subset # perform c-steps until convergence for subset in best_subsets: while True: new_subset = self._perform_c_step(subset, X) if ( np.all(new_subset.indices == subset.indices) or (new_subset.determinant - subset.determinant) < self.tolerance ): break subset = new_subset if subset.determinant < best_subset.determinant: best_subset = subset self.best_subset = best_subset # post processing self.location_ = best_subset.location scale = best_subset.scale if self.correct_covariance: self.logger.debug("Applying consistency correction to the raw covariance estimate") scale = self._correct_covariance(X, best_subset) if self.reweighting: self.logger.debug("Applying reweighting to the raw covariance estimate") distances = mahalanobis_distance(X, best_subset.location, scale) self._mask = distances < np.sqrt(chi2.ppf(0.975, X.shape[1])) self.location_ = X[self._mask].mean(axis=0) scale = np.cov(X[self._mask], rowvar=False) if self.correct_covariance: # See Croux & Haesbroeck 1999 and R code for robustbase>covMcd>MCDCons # (https://rdrr.io/cran/robustbase/src/R/covMcd.R) self.logger.debug("Applying consistency correction after reweighting.") factor = 1 / ( gamma.cdf((chi2.ppf(0.975, X.shape[1]) / 2), X.shape[1] / 2 + 1) / 0.975 ) scale *= factor return scale
def _correct_covariance(self, X: np.ndarray, best_subset: HSubset) -> np.ndarray: self._rescale_factor = np.median( np.square(mahalanobis_distance(X, best_subset.location, best_subset.scale)) ) / chi2.ppf(0.5, X.shape[1]) return self._rescale_factor * best_subset.scale def _perform_multiple_c_steps( self, subset: HSubset, X: np.ndarray, n_iterations: int ) -> HSubset: for _ in range(n_iterations): subset = self._perform_c_step(subset, X) return subset def _perform_c_step(self, subset: HSubset, X: np.ndarray) -> HSubset: """ Perform a single C-step on the subset of the data Args: subset (HSubset): indices of the current subset. X (np.ndarray): data. Returns: HSubset: indices for new h subset. """ # Calculate the Mahalanobis distances mahalanobis = mahalanobis_distance(X, subset.location, subset.scale) # Find the alpha (h_size) smallest distances h = self._get_h(X) idx = np.argsort(mahalanobis)[:h] return self._get_subset(indices=idx, X=X, n_c_steps=subset.n_c_steps + 1) def _get_h(self, X: np.ndarray) -> int: n, p = X.shape if self.alpha is None: return int((n + p + 1) / 2) elif isinstance(self.alpha, int) and (n / 2 <= self.alpha <= n): if self.alpha < int((n + p + 1) / 2): warnings.warn( f"h is too small and therefore set to [(n+p+1)/2] ({int((n + p + 1) / 2)}).", category=UserWarning, stacklevel=2, ) return np.max([self.alpha, int((n + p + 1) / 2)]) elif (isinstance(self.alpha, float) and (0.5 <= self.alpha <= 1)) or self.alpha == 1: if int(self.alpha * n) < int((n + p + 1) / 2): warnings.warn( f"h = alpha*n is too small and therefore set to [(n+p+1)/2]" f" ({int((n + p + 1) / 2)}).", category=UserWarning, stacklevel=2, ) return np.max([int(self.alpha * n), int((n + p + 1) / 2)]) else: raise ValueError( f"alpha must be an integer between n/2 and n or" f" a float between 0.5 and 1, but received {self.alpha}." ) def _partition_data(self, X: np.ndarray) -> list[np.ndarray]: if self.n_partitions is None: n_partitions = 5 if X.shape[0] > 600 else 1 else: n_partitions = self.n_partitions return np.array_split(X, n_partitions) def _get_subset( self, indices: np.ndarray, X: np.ndarray, n_c_steps: int = 0, ensure_non_singular: bool = False, ) -> HSubset: """ Construct an HSubset from a set of data indices and calculate location, scale and determinant. Args: - indices (np.ndarray): data indices. - X (np.ndarray): complete dataset. - n_c_steps (int): will be passed directly to the HSubset. - ensure_non_singular (bool): whether to resample in case the determinant is 0 (relevant for sampling initial subsets). """ mu = X[indices].mean(axis=0) cov = np.cov(X[indices], rowvar=False) det = np.linalg.det(cov) if ensure_non_singular: while math.isclose(det, 0): new_index = self.rng.choice(np.delete(np.arange(X.shape[0]), indices)) indices = np.append(indices, new_index) mu = X[indices].mean(axis=0) cov = np.cov(X[indices], rowvar=False) det = np.linalg.det(cov) return HSubset(indices, mu, cov, det, n_c_steps=n_c_steps) def _get_initial_subsets(self, X: np.ndarray, n_subsets: int) -> list[HSubset]: return [ self._get_subset( indices=self.rng.choice(X.shape[0], X.shape[1] + 1, replace=False), X=X, ensure_non_singular=True, ) for _ in range(n_subsets) ]
[docs] class DetMCD(RobustCovariance): def __init__( self, *, alpha: float | int | None = None, tolerance: float = 1e-8, correct_covariance: bool = True, reweighting: bool = True, verbosity: int = logging.WARNING, ): """ Deterministic MCD estimator (DetMCD) based on the algorithm proposed in Hubert, M., Rousseeuw, P. J., & Verdonck, T. (2012). Args: alpha (float | int | None, optional): Size of the h subset. If an integer between n/2 and n is passed, it is interpreted as h. If a float between 0.5 and 1 is passed, it is interpreted as a proportion of n (the training set size). If None or an integer below [(n+p+1)/2], h is set to [(n+p+1)/2]. Defaults to None. tolerance (float, optional): Minimum difference in determinant between two iterations to stop the C-step. Defaults to 1e-8. correct_covariance (bool, optional): Whether to apply a consistency correction to the raw covariance estimate. Defaults to True. reweighting (bool, optional): Whether to apply reweighting to the raw covariance estimate. Defaults to True. References: - Hubert, M., Rousseeuw, P. J., & Verdonck, T. (2012). A deterministic algorithm for robust location and scatter. Journal of Computational and Graphical Statistics, 21(3), 618-637. """ super().__init__() self.alpha = alpha self.tolerance = tolerance self.correct_covariance = correct_covariance self.reweighting = reweighting self.logger = get_logger("DetMCD", level=verbosity) self.verbosity = verbosity
[docs] def calculate_covariance(self, X: np.ndarray) -> np.ndarray: if self.alpha == 1 or self.alpha == X.shape[0]: self.logger.warning(f"Default covariance is returned as alpha is {self.alpha}.") self.location_ = X.mean(0) return np.cov(X, rowvar=False) n, p = X.shape # Step 0: standardize X if n < 1000: Z = (X - np.median(X, axis=0)) / self._Qn_scale(X) else: Z = (X - np.median(X, axis=0)) / self._tau_scale(X) # Steps 1-3: best_subsets = self._get_initial_best_subsets(Z, X, n, p) # Step 4: C-steps until convergence best_subset = best_subsets[0] # reference subset for subset in best_subsets: while True: new_subset = self._perform_c_step(subset, X) if ( np.all(new_subset.indices == subset.indices) or (new_subset.determinant - subset.determinant) < self.tolerance ): break subset = new_subset if subset.determinant < best_subset.determinant: best_subset = subset self.best_subset = best_subset # Step 5: post processing self.location_ = best_subset.location scale = best_subset.scale if self.correct_covariance: self.logger.debug("Applying consistency correction to the raw covariance estimate") scale = self._correct_covariance(X, best_subset) if self.reweighting: self.logger.debug("Applying reweighting to the raw covariance estimate") distances = mahalanobis_distance(X, best_subset.location, scale) self._mask = distances < np.sqrt(chi2.ppf(0.975, X.shape[1])) self.location_ = X[self._mask].mean(axis=0) scale = np.cov(X[self._mask], rowvar=False) if self.correct_covariance: # See Croux & Haesbroeck 1999 and R code for robustbase>covMcd>MCDCons # (https://rdrr.io/cran/robustbase/src/R/covMcd.R) self.logger.debug("Applying consistency correction after reweighting.") factor = 1 / ( gamma.cdf((chi2.ppf(0.975, X.shape[1]) / 2), X.shape[1] / 2 + 1) / 0.975 ) scale *= factor return scale
def _Qn_scale(self, X: np.ndarray, axis=0): if X.ndim == 1: return Qn().fit(X).scale elif axis == 0: return [Qn().fit(col).scale for col in X.T] elif axis == 1: return [Qn().fit(col).scale for col in X] else: raise ValueError(f"axis {axis} not supported") def _tau_scale(self, X: np.ndarray, axis=0): if X.ndim == 1: return Tau().fit(X).scale elif axis == 0: return [Tau().fit(col).scale for col in X.T] elif axis == 1: return [Tau().fit(col).scale for col in X] else: raise ValueError(f"axis {axis} not supported") def _get_initial_best_subsets(self, Z: np.ndarray, X: np.ndarray, n: int, p: int): # Step 1: construct 6 preliminary estimates Sk of covariance or correlation Y = np.tanh(Z) S1 = np.corrcoef(Y, rowvar=False) R = rankdata(Z, axis=0) S2 = np.corrcoef(R, rowvar=False) S3 = np.corrcoef(norm.ppf((R - 1 / 3) / (n + 1 / 3)), rowvar=False) znorm = np.sqrt(np.sum(Z * Z, axis=1)) w = 1 / znorm S4 = np.dot((Z * w[:, np.newaxis]).T, (Z * w[:, np.newaxis])) / n idx = np.argsort(znorm)[: math.ceil(n / 2)] S5 = np.cov(Z[idx, :], rowvar=False) S6 = OGK( location_estimator=np.median, scale_estimator=self._Qn_scale, reweighting=False ).calculate_covariance(Z) estimates_S = [S1, S2, S3, S4, S5, S6] # Step 2: construct 6 initial location and scatter estimates estimates_sigma = [] estimates_mu = [] for S in estimates_S: _, E = np.linalg.eigh(S) E = E[:, np.arange(p - 1, -1, -1)] B = Z @ E L = np.diag(np.power(self._Qn_scale(B), 2)) cov = E @ L @ E.T estimates_sigma.append(cov) root_cov = sqrtm(cov) inv_root_cov = np.linalg.inv(root_cov) mu = root_cov @ np.median(Z @ inv_root_cov, axis=0) estimates_mu.append(mu) # Step 3: calculate statistical distances best_subsets = [] for mu, cov in zip(estimates_mu, estimates_sigma): idx_h0 = np.argsort(mahalanobis_distance(Z, mu, cov))[: math.ceil(n / 2)] H = self._get_subset(idx_h0, X) # h0 H = self._perform_c_step(H, X) # h best_subsets.append(H) return best_subsets # TODO: de functies hieronder moeten nog naar ergens anders. def _get_h(self, X: np.ndarray) -> int: n, p = X.shape if self.alpha is None: return int((n + p + 1) / 2) elif isinstance(self.alpha, int) and (n / 2 <= self.alpha <= n): if self.alpha < int((n + p + 1) / 2): warnings.warn( f"h is too small and therefore set to [(n+p+1)/2] ({int((n + p + 1) / 2)}).", category=UserWarning, stacklevel=2, ) return np.max([self.alpha, int((n + p + 1) / 2)]) elif (isinstance(self.alpha, float) and (0.5 <= self.alpha <= 1)) or self.alpha == 1: if int(self.alpha * n) < int((n + p + 1) / 2): warnings.warn( f"h = alpha*n is too small and therefore set to [(n+p+1)/2]" f" ({int((n + p + 1) / 2)}).", category=UserWarning, stacklevel=2, ) return np.max([int(self.alpha * n), int((n + p + 1) / 2)]) else: raise ValueError( f"alpha must be an integer between n/2 and n or" f" a float between 0.5 and 1, but received {self.alpha}." ) def _get_subset( self, indices: np.ndarray, X: np.ndarray, n_c_steps: int = 0, ensure_non_singular: bool = False, ) -> HSubset: """Construct an HSubset from a set of data indices and calculate location, scale and determinant. Args: - indices (np.ndarray): data indices. - X (np.ndarray): complete dataset. - n_c_steps (int): will be passed directly to the HSubset. - ensure_non_singular (bool): whether to resample in case the determinant is 0 (relevant for sampling initial subsets). Returns: Hsubset. """ mu = X[indices].mean(axis=0) cov = np.cov(X[indices], rowvar=False) det = np.linalg.det(cov) if ensure_non_singular: while math.isclose(det, 0): new_index = np.random.choice(np.delete(np.arange(X.shape[0]), indices)) indices = np.append(indices, new_index) mu = X[indices].mean(axis=0) cov = np.cov(X[indices], rowvar=False) det = np.linalg.det(cov) return HSubset(indices, mu, cov, det, n_c_steps=n_c_steps) def _perform_c_step(self, subset: HSubset, X: np.ndarray) -> HSubset: """Perform a single C-step on the subset of the data Args: subset (HSubset): indices of the current subset. X (np.ndarray): data. Returns: HSubset: indices for new h subset. """ # Calculate the Mahalanobis distances mahalanobis = mahalanobis_distance(X, subset.location, subset.scale) # Find the alpha (h_size) smallest distances h = self._get_h(X) idx = np.argsort(mahalanobis)[:h] return self._get_subset(indices=idx, X=X, n_c_steps=subset.n_c_steps + 1) def _correct_covariance(self, X: np.ndarray, best_subset: HSubset) -> np.ndarray: self._rescale_factor = np.median( np.square(mahalanobis_distance(X, best_subset.location, best_subset.scale)) ) / chi2.ppf(0.5, X.shape[1]) return self._rescale_factor * best_subset.scale